Skip to content

Commit 8152d8b

Browse files
authored
chore: Add support for transactional data stores. (#186)
- Adds support for transactional application of data to the in-memory data store. - Implements a conversion from an FDv2ChangeSet to a DataStore specific ChangeSet. - Changes the FDv2 selector to be generally applicable instead of FDv2 specific. - Does not implement the interface for persistent stores. - Does not add support for data source updates, just the store. - Changes a number of types to structs that should have been structs. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Adds transactional ChangeSet support to the in-memory store and unifies FDv2 selector into a shared Selector used across data flow. > > - **Data store**: > - **Transactional updates**: `InMemoryDataStore` now implements `ITransactionalDataStore` with `Apply(ChangeSet)` supporting `Full`, `Partial`, and `None` updates; maintains `Selector` and preserves/env-id metadata on full init. > - **ChangeSet API**: Introduces `DataStoreTypes.ChangeSet` and `ChangeSetType` for batching changes by `DataKind`. > - **Selector unification**: > - Replaces FDv2-specific selector with shared `Subsystems.Selector`; exposes `Selector` on `ITransactionalDataStore` and `InMemoryDataStore`. > - **FDv2 pipeline**: > - `FDv2ChangeSet` now carries shared `Selector` and uses struct `FDv2Change`. > - New translator `FDv2ChangeSetTranslator.ToChangeSet` converts FDv2 changes to store `ChangeSet`; retains FDv1 `PutData`/`PatchData` translations. > - `FDv2ProtocolHandler` and `FDv2StreamingDataSource` updated to use shared `Selector`. > - **Tests**: > - Extensive tests for transactional apply behavior, selector handling, translator conversion, protocol updates, and streaming integration. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 1808897. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 615df4c commit 8152d8b

File tree

13 files changed

+1127
-50
lines changed

13 files changed

+1127
-50
lines changed

pkgs/sdk/server/src/Internal/DataStores/InMemoryDataStore.cs

Lines changed: 102 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
using System.Collections.Immutable;
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Collections.Immutable;
24
using LaunchDarkly.Sdk.Server.Subsystems;
3-
45
using static LaunchDarkly.Sdk.Server.Subsystems.DataStoreTypes;
56

67
namespace LaunchDarkly.Sdk.Server.Internal.DataStores
@@ -12,16 +13,42 @@ namespace LaunchDarkly.Sdk.Server.Internal.DataStores
1213
/// Application code cannot see this implementation class and uses
1314
/// <see cref="Components.InMemoryDataStore"/> instead.
1415
/// </remarks>
15-
internal class InMemoryDataStore : IDataStore, IDataStoreMetadata
16+
internal class InMemoryDataStore : IDataStore, IDataStoreMetadata, ITransactionalDataStore
1617
{
1718
private readonly object WriterLock = new object();
19+
1820
private volatile ImmutableDictionary<DataKind, ImmutableDictionary<string, ItemDescriptor>> Items =
1921
ImmutableDictionary<DataKind, ImmutableDictionary<string, ItemDescriptor>>.Empty;
22+
2023
private volatile bool _initialized = false;
2124

2225
private volatile InitMetadata _metadata;
2326

24-
internal InMemoryDataStore() { }
27+
private readonly object _selectorLock = new object();
28+
private Selector _selector;
29+
30+
public Selector Selector
31+
{
32+
get
33+
{
34+
lock (_selectorLock)
35+
{
36+
return _selector;
37+
}
38+
}
39+
private set
40+
{
41+
lock (_selectorLock)
42+
{
43+
_selector = value;
44+
}
45+
}
46+
}
47+
48+
internal InMemoryDataStore()
49+
{
50+
Selector = Selector.Empty;
51+
}
2552

2653
public bool StatusMonitoringEnabled => false;
2754

@@ -36,10 +63,12 @@ public void Init(FullDataSet<ItemDescriptor> data)
3663
{
3764
return null;
3865
}
66+
3967
if (!itemsOfKind.TryGetValue(key, out var item))
4068
{
4169
return null;
4270
}
71+
4372
return item;
4473
}
4574

@@ -49,6 +78,7 @@ public KeyedItems<ItemDescriptor> GetAll(DataKind kind)
4978
{
5079
return new KeyedItems<ItemDescriptor>(itemsOfKind);
5180
}
81+
5282
return KeyedItems<ItemDescriptor>.Empty();
5383
}
5484

@@ -60,33 +90,96 @@ public bool Upsert(DataKind kind, string key, ItemDescriptor item)
6090
{
6191
itemsOfKind = ImmutableDictionary<string, ItemDescriptor>.Empty;
6292
}
93+
6394
if (!itemsOfKind.TryGetValue(key, out var old) || old.Version < item.Version)
6495
{
6596
var newItemsOfKind = itemsOfKind.SetItem(key, item);
6697
Items = Items.SetItem(kind, newItemsOfKind);
6798
return true;
6899
}
100+
69101
return false;
70102
}
71103
}
72104

105+
public void Apply(ChangeSet<ItemDescriptor> changeSet)
106+
{
107+
switch (changeSet.Type)
108+
{
109+
case ChangeSetType.Full:
110+
ApplyFullPayload(changeSet.Data, new InitMetadata(changeSet.EnvironmentId), changeSet.Selector);
111+
break;
112+
case ChangeSetType.Partial:
113+
ApplyPartialData(changeSet.Data, changeSet.Selector);
114+
break;
115+
case ChangeSetType.None:
116+
break;
117+
default:
118+
// This represents an implementation error. The ChangeSetType was extended, but handling was not
119+
// added.
120+
throw new ArgumentOutOfRangeException();
121+
}
122+
}
123+
73124
public bool Initialized()
74125
{
75126
return _initialized;
76127
}
77128

78-
public void Dispose() { }
129+
public void Dispose()
130+
{
131+
}
79132

80133
public void InitWithMetadata(FullDataSet<ItemDescriptor> data, InitMetadata metadata)
81134
{
82-
var itemsBuilder = ImmutableDictionary.CreateBuilder<DataKind, ImmutableDictionary<string, ItemDescriptor>>();
135+
ApplyFullPayload(data.Data, metadata, Selector.Empty);
136+
}
137+
138+
private void ApplyPartialData(IEnumerable<KeyValuePair<DataKind, KeyedItems<ItemDescriptor>>> data,
139+
Selector selector)
140+
{
141+
lock (WriterLock)
142+
{
143+
// Build the complete updated dictionary before assigning to Items for transactional update
144+
var itemsBuilder = Items.ToBuilder();
145+
146+
foreach (var kindItemsPair in data)
147+
{
148+
var kind = kindItemsPair.Key;
149+
var kindBuilder = ImmutableDictionary.CreateBuilder<string, ItemDescriptor>();
150+
151+
if (!Items.TryGetValue(kind, out var itemsOfKind))
152+
{
153+
itemsOfKind = ImmutableDictionary<string, ItemDescriptor>.Empty;
154+
}
155+
156+
kindBuilder.AddRange(itemsOfKind);
157+
158+
foreach (var keyValuePair in kindItemsPair.Value.Items)
159+
{
160+
kindBuilder[keyValuePair.Key] = keyValuePair.Value;
161+
}
162+
163+
itemsBuilder[kind] = kindBuilder.ToImmutable();
164+
}
165+
166+
Items = itemsBuilder.ToImmutable();
167+
Selector = selector;
168+
}
169+
}
170+
171+
private void ApplyFullPayload(IEnumerable<KeyValuePair<DataKind, KeyedItems<ItemDescriptor>>> data,
172+
InitMetadata metadata, Selector selector)
173+
{
174+
var itemsBuilder =
175+
ImmutableDictionary.CreateBuilder<DataKind, ImmutableDictionary<string, ItemDescriptor>>();
83176

84-
foreach (var kindEntry in data.Data)
177+
foreach (var kindEntry in data)
85178
{
86179
var kindItemsBuilder = ImmutableDictionary.CreateBuilder<string, ItemDescriptor>();
87180
foreach (var e1 in kindEntry.Value.Items)
88181
{
89-
kindItemsBuilder.Add(e1.Key, e1.Value);
182+
kindItemsBuilder[e1.Key] = e1.Value;
90183
}
91184

92185
itemsBuilder.Add(kindEntry.Key, kindItemsBuilder.ToImmutable());
@@ -99,6 +192,7 @@ public void InitWithMetadata(FullDataSet<ItemDescriptor> data, InitMetadata meta
99192
Items = newItems;
100193
_metadata = metadata;
101194
_initialized = true;
195+
Selector = selector;
102196
}
103197
}
104198

pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2ChangeSet.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Immutable;
33
using System.Text.Json;
4+
using LaunchDarkly.Sdk.Server.Subsystems;
45

56
namespace LaunchDarkly.Sdk.Server.Internal.FDv2DataSources
67
{
@@ -41,7 +42,7 @@ internal enum FDv2ChangeSetType
4142
/// <summary>
4243
/// Represents a single change to a data object.
4344
/// </summary>
44-
internal sealed class FDv2Change
45+
internal struct FDv2Change
4546
{
4647
/// <summary>
4748
/// The type of change operation.
@@ -114,26 +115,26 @@ internal sealed class FDv2ChangeSet
114115
/// <summary>
115116
/// The selector (version identifier) for this changeset.
116117
/// </summary>
117-
public FDv2Selector FDv2Selector { get; }
118+
public Selector Selector { get; }
118119

119120
/// <summary>
120121
/// Constructs a new ChangeSet.
121122
/// </summary>
122123
/// <param name="type">The type of the changeset.</param>
123124
/// <param name="changes">The list of changes.</param>
124-
/// <param name="fDv2Selector">The selector.</param>
125+
/// <param name="selector">The selector.</param>
125126
/// <exception cref="ArgumentNullException">Thrown when <paramref name="changes"/> is null.</exception>
126-
public FDv2ChangeSet(FDv2ChangeSetType type, ImmutableList<FDv2Change> changes, FDv2Selector fDv2Selector)
127+
public FDv2ChangeSet(FDv2ChangeSetType type, ImmutableList<FDv2Change> changes, Selector selector)
127128
{
128129
Type = type;
129130
Changes = changes ?? throw new ArgumentNullException(nameof(changes));
130-
FDv2Selector = fDv2Selector;
131+
Selector = selector;
131132
}
132133

133134
/// <summary>
134135
/// An empty changeset that indicates no changes are required.
135136
/// </summary>
136137
public static FDv2ChangeSet None { get; } =
137-
new FDv2ChangeSet(FDv2ChangeSetType.None, ImmutableList<FDv2Change>.Empty, FDv2Selector.Empty);
138+
new FDv2ChangeSet(FDv2ChangeSetType.None, ImmutableList<FDv2Change>.Empty, Selector.Empty);
138139
}
139140
}

pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2ChangeSetTranslator.cs

Lines changed: 96 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Collections.Generic;
23
using System.Linq;
34
using LaunchDarkly.Logging;
@@ -7,12 +8,104 @@
78
namespace LaunchDarkly.Sdk.Server.Internal.FDv2DataSources
89
{
910
/// <summary>
10-
/// Translates FDv2 changesets into FDv1 format.
11+
/// Translates FDv2 changesets into data store formats.
1112
/// </summary>
1213
internal static class FDv2ChangeSetTranslator
1314
{
1415
/// <summary>
15-
/// Translates an FDv2 changeset with Full or None type into PutData.
16+
/// Converts an FDv2ChangeSet to a DataStoreTypes.ChangeSet.
17+
/// </summary>
18+
/// <param name="changeset">The FDv2 changeset to convert.</param>
19+
/// <param name="log">Logger for diagnostic messages.</param>
20+
/// <param name="environmentId">The environment ID to include in the changeset.</param>
21+
/// <returns>A DataStoreTypes.ChangeSet containing the converted data.</returns>
22+
/// <exception cref="ArgumentOutOfRangeException">Thrown when the changeset type is unknown.</exception>
23+
public static DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> ToChangeSet(
24+
FDv2ChangeSet changeset,
25+
Logger log,
26+
string environmentId = null)
27+
{
28+
DataStoreTypes.ChangeSetType changeSetType;
29+
switch (changeset.Type)
30+
{
31+
case FDv2ChangeSetType.Full:
32+
changeSetType = DataStoreTypes.ChangeSetType.Full;
33+
break;
34+
case FDv2ChangeSetType.Partial:
35+
changeSetType = DataStoreTypes.ChangeSetType.Partial;
36+
break;
37+
case FDv2ChangeSetType.None:
38+
changeSetType = DataStoreTypes.ChangeSetType.None;
39+
break;
40+
default:
41+
throw new ArgumentOutOfRangeException(nameof(changeset),
42+
$"Unknown FDv2ChangeSetType: {changeset.Type}. This is an implementation error.");
43+
}
44+
45+
// Use a dictionary to group items by DataKind in a single pass
46+
var kindToItems = new Dictionary<DataStoreTypes.DataKind,
47+
System.Collections.Immutable.ImmutableList<KeyValuePair<string, DataStoreTypes.ItemDescriptor>>.Builder>();
48+
49+
foreach (var change in changeset.Changes)
50+
{
51+
var dataKind = GetDataKind(change.Kind);
52+
53+
if (dataKind == null)
54+
{
55+
log.Warn($"Unknown data kind '{change.Kind}' in changeset, skipping");
56+
continue;
57+
}
58+
59+
DataStoreTypes.ItemDescriptor item;
60+
61+
switch (change.Type)
62+
{
63+
case FDv2ChangeType.Put when !change.Object.HasValue:
64+
log.Warn($"Put operation for {change.Kind}/{change.Key} missing object data, skipping");
65+
continue;
66+
case FDv2ChangeType.Put:
67+
item = dataKind.DeserializeFromJsonElement(change.Object.Value);
68+
break;
69+
case FDv2ChangeType.Delete:
70+
item = DataStoreTypes.ItemDescriptor.Deleted(change.Version);
71+
break;
72+
default:
73+
throw new ArgumentOutOfRangeException(nameof(change),
74+
$"Unknown FDv2ChangeType: {change.Type}. This is an implementation error.");
75+
}
76+
77+
if (!kindToItems.TryGetValue(dataKind, out var itemsBuilder))
78+
{
79+
itemsBuilder = System.Collections.Immutable.ImmutableList
80+
.CreateBuilder<KeyValuePair<string, DataStoreTypes.ItemDescriptor>>();
81+
kindToItems[dataKind] = itemsBuilder;
82+
}
83+
84+
itemsBuilder.Add(new KeyValuePair<string, DataStoreTypes.ItemDescriptor>(change.Key, item));
85+
}
86+
87+
var dataBuilder = System.Collections.Immutable.ImmutableList
88+
.CreateBuilder<KeyValuePair<DataStoreTypes.DataKind,
89+
DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor>>>();
90+
91+
foreach (var kvp in kindToItems)
92+
{
93+
dataBuilder.Add(
94+
new KeyValuePair<DataStoreTypes.DataKind, DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor>>(
95+
kvp.Key,
96+
new DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor>(kvp.Value.ToImmutable())
97+
));
98+
}
99+
100+
return new DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor>(
101+
changeSetType,
102+
changeset.Selector,
103+
dataBuilder.ToImmutable(),
104+
environmentId);
105+
}
106+
107+
/// <summary>
108+
/// Translates an FDv2 changeset with Full or None type into FDv1 PutData format.
16109
/// </summary>
17110
/// <param name="changeset">The changeset to translate.</param>
18111
/// <param name="log">Logger for diagnostic messages.</param>
@@ -60,7 +153,7 @@ public static StreamProcessorEvents.PutData TranslatePutData(FDv2ChangeSet chang
60153
}
61154

62155
/// <summary>
63-
/// Translates an FDv2 changeset with Partial type into a list of PatchData.
156+
/// Translates an FDv2 changeset with Partial type into a list of FDv1 PatchData format.
64157
/// </summary>
65158
/// <param name="changeset">The changeset to translate.</param>
66159
/// <param name="log">Logger for diagnostic messages.</param>

pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2ProtocolHandler.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Linq;
44
using System.Text.Json;
55
using LaunchDarkly.Sdk.Server.Internal.FDv2Payloads;
6+
using LaunchDarkly.Sdk.Server.Subsystems;
67

78
namespace LaunchDarkly.Sdk.Server.Internal.FDv2DataSources
89
{
@@ -277,7 +278,7 @@ private IFDv2ProtocolAction PayloadTransferred(PayloadTransferred payload)
277278
}
278279

279280
var changeset = new FDv2ChangeSet(changeSetType, _changes.ToImmutableList(),
280-
FDv2Selector.Make(payload.Version, payload.State));
281+
Selector.Make(payload.Version, payload.State));
281282
_state = FDv2ProtocolState.Changes;
282283
_changes.Clear();
283284
return new FDv2ActionChangeset(changeset);

pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2Selector.cs

Lines changed: 0 additions & 23 deletions
This file was deleted.

0 commit comments

Comments
 (0)