Skip to content

Commit 615df4c

Browse files
authored
chore: Add initial FDv2 streaming data source. (#185)
Adds an FDv2 Streaming Data Source which handles an FDv2 stream, but produces FDv1 store updates. Later the event source will be updated to produce output in FDv2 format for the transactional store. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Adds an FDv2 SSE data source that translates FDv2 changes into FDv1 store updates, introduces JsonElement-based deserialization paths, and includes supporting utilities and tests. > > - **Data sources**: > - **FDv2 streaming**: New `Internal/FDv2DataSources/FDv2StreamingDataSource` handling SSE events, selector-based query params, header-aware init, error/status handling, and upserts via FDv1 APIs. > - **FDv2→FDv1 translation**: New `FDv2ChangeSetTranslator` converts FDv2 change sets into `PutData`/`PatchData` for existing store interfaces. > - **Endpoints/Logging**: Add `StandardEndpoints.FDv2StreamingRequestPath` and `FDv2PollingRequestPath`; add `LogNames.FDv2DataSourceSubLog`. > - **Model/serialization**: > - Extend `DataStoreTypes.DataKind` with `DeserializeFromJsonElement`; update `DataModel.Features`/`Segments` to wire new deserializers. > - In FDv2 payloads, change `PutObject.Object` to `JsonElement`; add `FDv2Event.TryDeserializeFromJsonString`. > - **Utilities**: > - New `Internal/FDv2DataSources/QueryStringHelper` for parsing/building query strings. > - **Tests**: > - Add comprehensive tests for `FDv2StreamingDataSource`, `QueryStringHelper`, and FDv2 payload/protocol behavior; update protocol tests for `JsonElement` objects. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 59b4d95. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent d182f7e commit 615df4c

File tree

15 files changed

+1911
-44
lines changed

15 files changed

+1911
-44
lines changed

pkgs/sdk/server/src/DataModel.cs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
using System.Collections.Generic;
22
using System.Text.Json;
33
using LaunchDarkly.Sdk.Server.Internal.Model;
4-
54
using static LaunchDarkly.Sdk.Server.Subsystems.DataStoreTypes;
65

76
namespace LaunchDarkly.Sdk.Server
@@ -25,7 +24,8 @@ public static class DataModel
2524
/// Applications should not need to reference this object directly. It is public so that custom integrations
2625
/// and test code can serialize or deserialize data or inject it into a data store.
2726
/// </remarks>
28-
public static DataKind Features = new DataKind("features", SerializeFlag, DeserializeFlag);
27+
public static DataKind Features =
28+
new DataKind("features", SerializeFlag, DeserializeFlag, DeserializeFlagFromJsonElement);
2929

3030
/// <summary>
3131
/// The <see cref="DataKind"/> instance that describes user segment data.
@@ -34,7 +34,8 @@ public static class DataModel
3434
/// Applications should not need to reference this object directly. It is public so that custom integrations
3535
/// and test code can serialize or deserialize data or inject it into a data store.
3636
/// </remarks>
37-
public static DataKind Segments = new DataKind("segments", SerializeSegment, DeserializeSegment);
37+
public static DataKind Segments = new DataKind("segments", SerializeSegment, DeserializeSegment,
38+
DeserializeSegmentFromJsonElement);
3839

3940
/// <summary>
4041
/// An enumeration of all supported <see cref="DataKind"/>s.
@@ -59,8 +60,13 @@ private static void SerializeFlag(object o, Utf8JsonWriter w) =>
5960
private static ItemDescriptor DeserializeFlag(ref Utf8JsonReader r)
6061
{
6162
var flag = FeatureFlagSerialization.Instance.Read(ref r, null, null) as FeatureFlag;
62-
return flag.Deleted ? ItemDescriptor.Deleted(flag.Version) :
63-
new ItemDescriptor(flag.Version, flag);
63+
return flag.Deleted ? ItemDescriptor.Deleted(flag.Version) : new ItemDescriptor(flag.Version, flag);
64+
}
65+
66+
private static ItemDescriptor DeserializeFlagFromJsonElement(JsonElement element)
67+
{
68+
var flag = element.Deserialize<FeatureFlag>();
69+
return flag.Deleted ? ItemDescriptor.Deleted(flag.Version) : new ItemDescriptor(flag.Version, flag);
6470
}
6571

6672
private static void SerializeSegment(object o, Utf8JsonWriter w) =>
@@ -69,8 +75,15 @@ private static void SerializeSegment(object o, Utf8JsonWriter w) =>
6975
private static ItemDescriptor DeserializeSegment(ref Utf8JsonReader r)
7076
{
7177
var segment = SegmentSerialization.Instance.Read(ref r, null, null) as Segment;
72-
return segment.Deleted ? ItemDescriptor.Deleted(segment.Version) :
73-
new ItemDescriptor(segment.Version, segment);
78+
return segment.Deleted
79+
? ItemDescriptor.Deleted(segment.Version)
80+
: new ItemDescriptor(segment.Version, segment);
81+
}
82+
83+
private static ItemDescriptor DeserializeSegmentFromJsonElement(JsonElement element)
84+
{
85+
var flag = element.Deserialize<Segment>();
86+
return flag.Deleted ? ItemDescriptor.Deleted(flag.Version) : new ItemDescriptor(flag.Version, flag);
7487
}
7588
}
7689
}

pkgs/sdk/server/src/Internal/DataSources/StreamingDataSource.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ internal class StreamingDataSource : IDataSource
4141
private readonly Logger _log;
4242

4343
private readonly IEventSource _es;
44+
/// <summary>
45+
/// When the store enters a failed state, and we don't have "data source monitoring", we want to log
46+
/// a message that we are restarting the event source. We don't want to log this message on multiple
47+
/// sequential failures. This boolean is used to determine if the previous attempt to write also
48+
/// failed, and in which case we will not log.
49+
/// </summary>
4450
private volatile bool _lastStoreUpdateFailed = false;
4551
internal DateTime _esStarted; // exposed for testing
4652

pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2ChangeSet.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ internal sealed class FDv2Change
7272
/// <summary>
7373
/// The raw JSON string representing the object data (only present for Put operations).
7474
/// </summary>
75-
public string Object { get; }
75+
public JsonElement? Object { get; }
7676

7777
/// <summary>
7878
/// Constructs a new Change.
@@ -83,7 +83,7 @@ internal sealed class FDv2Change
8383
/// <param name="version">The version of the change.</param>
8484
/// <param name="obj">The raw JSON string representing the object data (required for Put operations).</param>
8585
/// <exception cref="ArgumentNullException">Thrown when <paramref name="kind"/> or <paramref name="key"/> is null.</exception>
86-
public FDv2Change(FDv2ChangeType type, string kind, string key, int version, string obj = null)
86+
public FDv2Change(FDv2ChangeType type, string kind, string key, int version, JsonElement? obj = null)
8787
{
8888
Type = type;
8989
Kind = kind ?? throw new ArgumentNullException(nameof(kind));
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
using System.Collections.Generic;
2+
using System.Linq;
3+
using LaunchDarkly.Logging;
4+
using LaunchDarkly.Sdk.Server.Internal.DataSources;
5+
using LaunchDarkly.Sdk.Server.Subsystems;
6+
7+
namespace LaunchDarkly.Sdk.Server.Internal.FDv2DataSources
8+
{
9+
/// <summary>
10+
/// Translates FDv2 changesets into FDv1 format.
11+
/// </summary>
12+
internal static class FDv2ChangeSetTranslator
13+
{
14+
/// <summary>
15+
/// Translates an FDv2 changeset with Full or None type into PutData.
16+
/// </summary>
17+
/// <param name="changeset">The changeset to translate.</param>
18+
/// <param name="log">Logger for diagnostic messages.</param>
19+
/// <returns>PutData containing the full dataset organized by data kind.</returns>
20+
public static StreamProcessorEvents.PutData TranslatePutData(FDv2ChangeSet changeset, Logger log)
21+
{
22+
var dataBuilder = System.Collections.Immutable.ImmutableList
23+
.CreateBuilder<KeyValuePair<DataStoreTypes.DataKind,
24+
DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor>>>();
25+
26+
var changesByKind = changeset.Changes.GroupBy(c => c.Kind);
27+
28+
foreach (var kindGroup in changesByKind)
29+
{
30+
var kind = kindGroup.Key;
31+
var dataKind = GetDataKind(kind);
32+
33+
if (dataKind == null)
34+
{
35+
log.Warn($"Unknown data kind '{kind}' in changeset, skipping");
36+
continue;
37+
}
38+
39+
var itemsBuilder = System.Collections.Immutable.ImmutableList
40+
.CreateBuilder<KeyValuePair<string, DataStoreTypes.ItemDescriptor>>();
41+
42+
foreach (var change in kindGroup)
43+
{
44+
if (change.Type != FDv2ChangeType.Put || !change.Object.HasValue) continue;
45+
var item = dataKind.DeserializeFromJsonElement(change.Object.Value);
46+
itemsBuilder.Add(new KeyValuePair<string, DataStoreTypes.ItemDescriptor>(change.Key, item));
47+
// Note: Delete operations in a Full changeset would be unusual, but we skip them
48+
// since a full transfer should only contain items that exist
49+
}
50+
51+
dataBuilder.Add(
52+
new KeyValuePair<DataStoreTypes.DataKind, DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor>>(
53+
dataKind,
54+
new DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor>(itemsBuilder.ToImmutable())
55+
));
56+
}
57+
58+
return new StreamProcessorEvents.PutData("/",
59+
new DataStoreTypes.FullDataSet<DataStoreTypes.ItemDescriptor>(dataBuilder.ToImmutable()));
60+
}
61+
62+
/// <summary>
63+
/// Translates an FDv2 changeset with Partial type into a list of PatchData.
64+
/// </summary>
65+
/// <param name="changeset">The changeset to translate.</param>
66+
/// <param name="log">Logger for diagnostic messages.</param>
67+
/// <returns>List of PatchData representing individual upserts or deletes.</returns>
68+
public static List<StreamProcessorEvents.PatchData> TranslatePatchData(FDv2ChangeSet changeset, Logger log)
69+
{
70+
var patches = new List<StreamProcessorEvents.PatchData>();
71+
72+
foreach (var change in changeset.Changes)
73+
{
74+
var dataKind = GetDataKind(change.Kind);
75+
76+
if (dataKind == null)
77+
{
78+
log.Warn($"Unknown data kind '{change.Kind}' in change, skipping");
79+
continue;
80+
}
81+
82+
DataStoreTypes.ItemDescriptor item;
83+
84+
switch (change.Type)
85+
{
86+
case FDv2ChangeType.Put when !change.Object.HasValue:
87+
log.Warn($"Put operation for {change.Kind}/{change.Key} missing object data, skipping");
88+
continue;
89+
// Deserialize the object using the DataKind's deserializer
90+
case FDv2ChangeType.Put:
91+
item = dataKind.DeserializeFromJsonElement(change.Object.Value);
92+
break;
93+
case FDv2ChangeType.Delete:
94+
// For deletes, create a deleted ItemDescriptor with the version
95+
item = DataStoreTypes.ItemDescriptor.Deleted(change.Version);
96+
break;
97+
default:
98+
log.Warn($"Unknown change type for {change.Kind}/{change.Key}, skipping");
99+
continue;
100+
}
101+
102+
patches.Add(new StreamProcessorEvents.PatchData(dataKind, change.Key, item));
103+
}
104+
105+
return patches;
106+
}
107+
108+
/// <summary>
109+
/// Maps an FDv2 object kind to the corresponding DataKind.
110+
/// </summary>
111+
/// <param name="kind">The kind string from the FDv2 change.</param>
112+
/// <returns>The corresponding DataKind, or null if the kind is not recognized.</returns>
113+
private static DataStoreTypes.DataKind GetDataKind(string kind)
114+
{
115+
switch (kind)
116+
{
117+
case "flag":
118+
return DataModel.Features;
119+
case "segment":
120+
return DataModel.Segments;
121+
default:
122+
return null;
123+
}
124+
}
125+
}
126+
}

0 commit comments

Comments
 (0)