Skip to content

Commit 7824e8a

Browse files
authored
Merge branch 'main' into rlamb/conditional-client-build-targets
2 parents 7f8fe65 + 5fe3ca7 commit 7824e8a

27 files changed

+2445
-140
lines changed

pkgs/sdk/server/contract-tests/SdkClientEntity.cs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,45 @@ private static Configuration BuildSdkConfig(SdkConfigParams sdkParams, ILogAdapt
514514
if (synchronizers.Count > 0)
515515
{
516516
dataSystemBuilder.Synchronizers(synchronizers.ToArray());
517+
518+
// Find the best synchronizer to use for FDv1 fallback configuration
519+
// Prefer polling synchronizers since FDv1 fallback is polling-based
520+
SdkConfigSynchronizerParams synchronizerForFallback = null;
521+
522+
// First, try to find a polling synchronizer (check secondary first, then primary)
523+
if (sdkParams.DataSystem.Synchronizers.Secondary != null &&
524+
sdkParams.DataSystem.Synchronizers.Secondary.Polling != null)
525+
{
526+
synchronizerForFallback = sdkParams.DataSystem.Synchronizers.Secondary;
527+
}
528+
else if (sdkParams.DataSystem.Synchronizers.Primary != null &&
529+
sdkParams.DataSystem.Synchronizers.Primary.Polling != null)
530+
{
531+
synchronizerForFallback = sdkParams.DataSystem.Synchronizers.Primary;
532+
}
533+
// If no polling synchronizer found, use primary synchronizer (could be streaming)
534+
else if (sdkParams.DataSystem.Synchronizers.Primary != null)
535+
{
536+
synchronizerForFallback = sdkParams.DataSystem.Synchronizers.Primary;
537+
}
538+
539+
if (synchronizerForFallback != null)
540+
{
541+
// Only configure global polling endpoints if we have a polling synchronizer with a custom base URI
542+
// This ensures the FDv1 fallback synchronizer uses the same base URI without overwriting
543+
// existing polling endpoint configuration
544+
if (synchronizerForFallback.Polling != null &&
545+
synchronizerForFallback.Polling.BaseUri != null)
546+
{
547+
endpoints.Polling(synchronizerForFallback.Polling.BaseUri);
548+
}
549+
550+
var fdv1Fallback = CreateFDv1FallbackSynchronizer(synchronizerForFallback);
551+
if (fdv1Fallback != null)
552+
{
553+
dataSystemBuilder.FDv1FallbackSynchronizer(fdv1Fallback);
554+
}
555+
}
517556
}
518557
}
519558

@@ -568,6 +607,33 @@ private static IComponentConfigurer<IDataSource> CreateSynchronizer(
568607
return null;
569608
}
570609

610+
private static IComponentConfigurer<IDataSource> CreateFDv1FallbackSynchronizer(
611+
SdkConfigSynchronizerParams synchronizer)
612+
{
613+
// FDv1 fallback synchronizer is always polling-based
614+
var fdv1PollingBuilder = DataSystemComponents.FDv1Polling();
615+
616+
// Configure polling interval if the synchronizer has polling configuration
617+
if (synchronizer.Polling != null)
618+
{
619+
if (synchronizer.Polling.PollIntervalMs.HasValue)
620+
{
621+
fdv1PollingBuilder.PollInterval(TimeSpan.FromMilliseconds(synchronizer.Polling.PollIntervalMs.Value));
622+
}
623+
// Note: FDv1 polling doesn't support ServiceEndpointsOverride, so base URI
624+
// will use the global service endpoints configuration
625+
}
626+
else if (synchronizer.Streaming != null)
627+
{
628+
// For streaming synchronizers, we still create a polling fallback
629+
// Use default polling interval since streaming doesn't have a poll interval
630+
// Note: FDv1 polling doesn't support ServiceEndpointsOverride, so base URI
631+
// will use the global service endpoints configuration
632+
}
633+
634+
return fdv1PollingBuilder;
635+
}
636+
571637
private MigrationVariationResponse DoMigrationVariation(MigrationVariationParams migrationVariation)
572638
{
573639
var defaultStage = MigrationStageExtensions.FromDataModelString(migrationVariation.DefaultStage);
Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1 @@
1-
streaming/retry behavior/do not retry after unrecoverable HTTP error on initial connect/error 401
2-
streaming/retry behavior/do not retry after unrecoverable HTTP error on initial connect/error 403
3-
streaming/retry behavior/do not retry after unrecoverable HTTP error on initial connect/error 405
4-
streaming/retry behavior/do not retry after unrecoverable HTTP error on reconnect/error 401
5-
streaming/retry behavior/do not retry after unrecoverable HTTP error on reconnect/error 403
6-
streaming/retry behavior/do not retry after unrecoverable HTTP error on reconnect/error 405
7-
streaming/fdv2/reconnection state management/initializes from 2 polling initializers
8-
streaming/fdv2/fallback to FDv1 handling
91
streaming/fdv2/disconnects on goodbye
10-
streaming/fdv2/reconnection state management/initializes from polling initializer

pkgs/sdk/server/src/Interfaces/DataSourceStatus.cs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ public struct ErrorInfo
6262
/// </summary>
6363
public ErrorKind Kind { get; set; }
6464

65+
/// <summary>
66+
/// Whether the error is recoverable. Recoverable errors are those that can be retried, such as network errors. Unrecoverable
67+
/// errors are those that cannot be retried, such as invalid SDK key errors.
68+
/// </summary>
69+
public bool Recoverable { get; set; }
70+
6571
/// <summary>
6672
/// The HTTP status code if the error was <see cref="ErrorKind.ErrorResponse"/>, or zero otherwise.
6773
/// </summary>
@@ -80,28 +86,38 @@ public struct ErrorInfo
8086
/// </summary>
8187
public DateTime Time { get; set; }
8288

89+
/// <summary>
90+
/// The error indicates to fall back to FDv1. (At the time of writing this, this was indicated
91+
/// via the x-ld-fd-fallback header, but this may change in the future. This is just info for posterity.)
92+
/// </summary>
93+
public bool FDv1Fallback { get; set; }
94+
8395
/// <summary>
8496
/// Constructs an instance based on an exception.
8597
/// </summary>
8698
/// <param name="e">the exception</param>
99+
/// <param name="recoverable">whether the error is recoverable</param>
87100
/// <returns>an ErrorInfo</returns>
88-
public static ErrorInfo FromException(Exception e) => new ErrorInfo
101+
public static ErrorInfo FromException(Exception e, bool recoverable) => new ErrorInfo
89102
{
90103
Kind = e is IOException ? ErrorKind.NetworkError : ErrorKind.Unknown,
91104
Message = e.Message,
92-
Time = DateTime.Now
105+
Time = DateTime.Now,
106+
Recoverable = recoverable
93107
};
94108

95109
/// <summary>
96110
/// Constructs an instance based on an HTTP error status.
97111
/// </summary>
98112
/// <param name="statusCode">the status code</param>
113+
/// <param name="recoverable">whether the error is recoverable</param>
99114
/// <returns>an ErrorInfo</returns>
100-
public static ErrorInfo FromHttpError(int statusCode) => new ErrorInfo
115+
public static ErrorInfo FromHttpError(int statusCode, bool recoverable) => new ErrorInfo
101116
{
102117
Kind = ErrorKind.ErrorResponse,
103118
StatusCode = statusCode,
104-
Time = DateTime.Now
119+
Time = DateTime.Now,
120+
Recoverable = recoverable
105121
};
106122

107123
/// <inheritdoc/>

pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
using LaunchDarkly.Sdk.Server.Interfaces;
55
using LaunchDarkly.Sdk.Server.Subsystems;
66

7-
using static LaunchDarkly.Sdk.Server.Subsystems.DataStoreTypes;
8-
97
namespace LaunchDarkly.Sdk.Server.Internal.DataSources
108
{
119
/// <summary>
@@ -196,6 +194,7 @@ private void TryFindNextUnderLock()
196194
}
197195

198196
var entry = _sourcesList.Next();
197+
199198
if (entry.Factory == null)
200199
{
201200
// Failed to find a next source, report error and shut down the composite source

pkgs/sdk/server/src/Internal/DataSources/PollingDataSource.cs

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ internal sealed class PollingDataSource : IDataSource
2323
private readonly Logger _log;
2424
private CancellationTokenSource _canceller;
2525

26+
private bool _disposed = false;
27+
private readonly AtomicBoolean _shuttingDown = new AtomicBoolean(false);
28+
2629
internal PollingDataSource(
2730
LdClientContext context,
2831
IFeatureRequestor featureRequestor,
@@ -81,62 +84,80 @@ private async Task UpdateTaskAsync()
8184
}
8285
catch (UnsuccessfulResponseException ex)
8386
{
84-
var errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(ex.StatusCode);
87+
var recoverable = HttpErrors.IsRecoverable(ex.StatusCode);
88+
var errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(ex.StatusCode, recoverable);
8589

86-
if (HttpErrors.IsRecoverable(ex.StatusCode))
90+
if (errorInfo.Recoverable)
8791
{
8892
_log.Warn(HttpErrors.ErrorMessage(ex.StatusCode, "polling request", "will retry"));
8993
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
9094
}
9195
else
9296
{
9397
_log.Error(HttpErrors.ErrorMessage(ex.StatusCode, "polling request", ""));
94-
_dataSourceUpdates.UpdateStatus(DataSourceState.Off, errorInfo);
9598
try
9699
{
97100
// if client is initializing, make it stop waiting
98-
_initTask.SetResult(true);
101+
_initTask.SetResult(false);
99102
}
100103
catch (InvalidOperationException)
101104
{
102105
// the task was already set - nothing more to do
103106
}
104-
((IDisposable)this).Dispose();
107+
Shutdown(errorInfo);
105108
}
106109
}
107110
catch (JsonException ex)
108111
{
109112
_log.Error("Polling request received malformed data: {0}", LogValues.ExceptionSummary(ex));
110-
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted,
111-
new DataSourceStatus.ErrorInfo
112-
{
113-
Kind = DataSourceStatus.ErrorKind.InvalidData,
114-
Time = DateTime.Now
115-
});
113+
var errorInfo = new DataSourceStatus.ErrorInfo
114+
{
115+
Kind = DataSourceStatus.ErrorKind.InvalidData,
116+
Message = ex.Message,
117+
Time = DateTime.Now,
118+
Recoverable = true
119+
};
120+
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
116121
}
117122
catch (Exception ex)
118123
{
119124
Exception realEx = (ex is AggregateException ae) ? ae.Flatten() : ex;
120125
_log.Warn("Polling for feature flag updates failed: {0}", LogValues.ExceptionSummary(ex));
121126
_log.Debug(LogValues.ExceptionTrace(ex));
122-
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted,
123-
DataSourceStatus.ErrorInfo.FromException(realEx));
127+
var errorInfo = DataSourceStatus.ErrorInfo.FromException(realEx, true); // default to recoverable
128+
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
124129
}
125130
}
126131

127132
void IDisposable.Dispose()
128133
{
134+
// dispose is currently overloaded with shutdown responsibility, we handle this first
135+
Shutdown(null);
136+
129137
Dispose(true);
130138
GC.SuppressFinalize(this);
131139
}
132140

133141
private void Dispose(bool disposing)
134142
{
135-
if (disposing)
136-
{
137-
_canceller?.Cancel();
143+
if (_disposed) return;
144+
145+
if (disposing) {
146+
// dispose managed resources if any
138147
_featureRequestor.Dispose();
139148
}
149+
150+
_disposed = true;
151+
}
152+
153+
private void Shutdown(DataSourceStatus.ErrorInfo? errorInfo)
154+
{
155+
// Prevent concurrent shutdown calls - only allow the first call to proceed
156+
// GetAndSet returns the OLD value, so if it was already true, we return early
157+
if (_shuttingDown.GetAndSet(true)) return;
158+
159+
_canceller?.Cancel();
160+
_dataSourceUpdates.UpdateStatus(DataSourceState.Off, errorInfo);
140161
}
141162

142163
private bool InitWithHeaders(DataStoreTypes.FullDataSet<DataStoreTypes.ItemDescriptor> allData,

pkgs/sdk/server/src/Internal/DataSources/StreamingDataSource.cs

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ internal class StreamingDataSource : IDataSource
5252

5353
private IEnumerable<KeyValuePair<string, IEnumerable<string>>> _headers;
5454

55+
private bool _disposed = false;
56+
private readonly AtomicBoolean _shuttingDown = new AtomicBoolean(false);
57+
5558
internal delegate IEventSource EventSourceCreator(Uri streamUri,
5659
HttpConfiguration httpConfig);
5760

@@ -104,20 +107,36 @@ public Task<bool> Start()
104107

105108
public void Dispose()
106109
{
110+
// dispose is currently overloaded with shutdown responsibility, we handle this first
111+
Shutdown(null);
112+
107113
Dispose(true);
108114
GC.SuppressFinalize(this);
109115
}
110116

111117
private void Dispose(bool disposing)
112118
{
113-
if (disposing)
119+
if (_disposed) return;
120+
121+
if (disposing) {
122+
// dispose managed resources if any
123+
}
124+
125+
_disposed = true;
126+
}
127+
128+
private void Shutdown(DataSourceStatus.ErrorInfo? errorInfo)
129+
{
130+
// Prevent concurrent shutdown calls - only allow the first call to proceed
131+
// GetAndSet returns the OLD value, so if it was already true, we return early
132+
if (_shuttingDown.GetAndSet(true)) return;
133+
134+
_es.Close();
135+
if (_storeStatusMonitoringEnabled)
114136
{
115-
_es.Close();
116-
if (_storeStatusMonitoringEnabled)
117-
{
118-
_dataSourceUpdates.DataStoreStatusProvider.StatusChanged -= OnDataStoreStatusChanged;
119-
}
137+
_dataSourceUpdates.DataStoreStatusProvider.StatusChanged -= OnDataStoreStatusChanged;
120138
}
139+
_dataSourceUpdates.UpdateStatus(DataSourceState.Off, errorInfo);
121140
}
122141

123142
#endregion
@@ -175,7 +194,8 @@ private void OnMessage(object sender, EventSource.MessageReceivedEventArgs e)
175194
{
176195
Kind = DataSourceStatus.ErrorKind.InvalidData,
177196
Message = ex.Message,
178-
Time = DateTime.Now
197+
Time = DateTime.Now,
198+
Recoverable = true
179199
};
180200
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
181201

@@ -187,7 +207,8 @@ private void OnMessage(object sender, EventSource.MessageReceivedEventArgs e)
187207
{
188208
Kind = DataSourceStatus.ErrorKind.StoreError,
189209
Message = (ex.InnerException ?? ex).Message,
190-
Time = DateTime.Now
210+
Time = DateTime.Now,
211+
Recoverable = true
191212
};
192213
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
193214
if (!_storeStatusMonitoringEnabled)
@@ -210,17 +231,16 @@ private void OnMessage(object sender, EventSource.MessageReceivedEventArgs e)
210231
private void OnError(object sender, EventSource.ExceptionEventArgs e)
211232
{
212233
var ex = e.Exception;
213-
var recoverable = true;
214234
DataSourceStatus.ErrorInfo errorInfo;
215235

216236
if (ex is EventSourceServiceUnsuccessfulResponseException respEx)
217237
{
218238
int status = respEx.StatusCode;
219-
errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(status);
239+
var recoverable = HttpErrors.IsRecoverable(status);
240+
errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(status, recoverable);
220241
RecordStreamInit(true);
221-
if (!HttpErrors.IsRecoverable(status))
242+
if (!recoverable)
222243
{
223-
recoverable = false;
224244
_log.Error(HttpErrors.ErrorMessage(status, "streaming connection", ""));
225245
}
226246
else
@@ -230,21 +250,23 @@ private void OnError(object sender, EventSource.ExceptionEventArgs e)
230250
}
231251
else
232252
{
233-
errorInfo = DataSourceStatus.ErrorInfo.FromException(ex);
253+
errorInfo = DataSourceStatus.ErrorInfo.FromException(ex, true); // default to recoverable
234254
_log.Warn("Encountered EventSource error: {0}", LogValues.ExceptionSummary(ex));
235255
_log.Debug(LogValues.ExceptionTrace(ex));
236256
}
237257

238-
_dataSourceUpdates.UpdateStatus(recoverable ? DataSourceState.Interrupted : DataSourceState.Off,
239-
errorInfo);
240-
241-
if (!recoverable)
258+
if (errorInfo.Recoverable)
259+
{
260+
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
261+
return;
262+
}
263+
else
242264
{
243265
// Make _initTask complete to tell the client to stop waiting for initialization. We use
244266
// TrySetResult rather than SetResult here because it might have already been completed
245267
// (if for instance the stream started successfully, then restarted and got a 401).
246268
_initTask.TrySetResult(false);
247-
((IDisposable)this).Dispose();
269+
Shutdown(errorInfo);
248270
}
249271
}
250272

0 commit comments

Comments
 (0)