Skip to content

Commit 69fb14a

Browse files
changed capacity behavior of buffer collection GetOrCreate() and renamed to EnsureBuffer() to make it explicit
1 parent 7914674 commit 69fb14a

File tree

3 files changed

+77
-32
lines changed

3 files changed

+77
-32
lines changed

DurableStateMachines.Tests/DurableRingBufferCollectionTests.cs

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using Microsoft.Extensions.DependencyInjection;
2-
using System.Diagnostics;
32

43
namespace DurableStateMachines.Tests;
54

@@ -8,11 +7,18 @@ public class DurableRingBufferCollectionTests(TestFixture fixture)
87
{
98
public interface IDurableRingBufferCollectionGrain : IGrainWithStringKey
109
{
10+
// Workaround methods to survive deactivation
11+
Task<Dictionary<string, int>> GetAllCapacities();
12+
Task SetAllCapacities(Dictionary<string, int> capacities);
13+
14+
// Collection methods
1115
Task<int> GetBuffersCount();
1216
Task<List<string>> GetKeys();
1317
Task<bool> ContainsBuffer(string key);
1418
Task<bool> RemoveBuffer(string key);
1519
Task ClearAll();
20+
21+
// Buffer methods
1622
Task Enqueue(string key, string value);
1723
Task<TryValue<string?>> TryDequeue(string key);
1824
Task SetBufferCapacity(string key, int capacity);
@@ -27,6 +33,25 @@ public class DurableRingBufferCollectionGrain(
2733
: DurableGrain, IDurableRingBufferCollectionGrain
2834
{
2935
private const int DefaultCapacity = 1;
36+
private readonly Dictionary<string, int> _capacities = [];
37+
38+
private IDurableRingBuffer<string> GetBuffer(string key)
39+
{
40+
var capacity = _capacities.GetValueOrDefault(key, DefaultCapacity);
41+
return state.EnsureBuffer(key, capacity);
42+
}
43+
44+
public Task<Dictionary<string, int>> GetAllCapacities() => Task.FromResult(new Dictionary<string, int>(_capacities));
45+
46+
public Task SetAllCapacities(Dictionary<string, int> capacities)
47+
{
48+
_capacities.Clear();
49+
foreach (var (key, value) in capacities)
50+
{
51+
_capacities[key] = value;
52+
}
53+
return Task.CompletedTask;
54+
}
3055

3156
public Task<int> GetBuffersCount() => Task.FromResult(state.Count);
3257
public Task<List<string>> GetKeys() => Task.FromResult(state.Keys.ToList());
@@ -37,26 +62,29 @@ public async Task<bool> RemoveBuffer(string key)
3762
var removed = state.Remove(key);
3863
if (removed)
3964
{
65+
_capacities.Remove(key);
4066
await WriteStateAsync();
4167
}
4268
return removed;
4369
}
4470

4571
public async Task ClearAll()
4672
{
73+
_capacities.Clear();
4774
state.Clear();
75+
4876
await WriteStateAsync();
4977
}
5078

5179
public async Task Enqueue(string key, string value)
5280
{
53-
state.GetOrCreate(key, DefaultCapacity).Enqueue(value);
81+
GetBuffer(key).Enqueue(value);
5482
await WriteStateAsync();
5583
}
5684

5785
public async Task<TryValue<string?>> TryDequeue(string key)
5886
{
59-
var success = state.GetOrCreate(key, DefaultCapacity).TryDequeue(out var item);
87+
var success = GetBuffer(key).TryDequeue(out var item);
6088
if (success)
6189
{
6290
await WriteStateAsync();
@@ -66,19 +94,20 @@ public async Task Enqueue(string key, string value)
6694

6795
public async Task SetBufferCapacity(string key, int capacity)
6896
{
69-
state.GetOrCreate(key, DefaultCapacity).SetCapacity(capacity);
97+
_capacities[key] = capacity;
98+
GetBuffer(key); // This will use the now just set '_capacities[key]', as it does EnsureBuffer(key, capacity).
7099
await WriteStateAsync();
71100
}
72101

73102
public async Task ClearBuffer(string key)
74103
{
75-
state.GetOrCreate(key, DefaultCapacity).Clear();
104+
GetBuffer(key).Clear();
76105
await WriteStateAsync();
77106
}
78107

79-
public Task<int> GetBufferCapacity(string key) => Task.FromResult(state.GetOrCreate(key, DefaultCapacity).Capacity);
80-
public Task<int> GetBufferItemsCount(string key) => Task.FromResult(state.GetOrCreate(key, DefaultCapacity).Count);
81-
public Task<List<string>> GetAllBufferItems(string key) => Task.FromResult(state.GetOrCreate(key, DefaultCapacity).ToList());
108+
public Task<int> GetBufferCapacity(string key) => Task.FromResult(GetBuffer(key).Capacity);
109+
public Task<int> GetBufferItemsCount(string key) => Task.FromResult(GetBuffer(key).Count);
110+
public Task<List<string>> GetAllBufferItems(string key) => Task.FromResult(GetBuffer(key).ToList());
82111
}
83112

84113
private IDurableRingBufferCollectionGrain GetGrain(string key) => fixture.Cluster.Client.GetGrain<IDurableRingBufferCollectionGrain>(key);
@@ -119,15 +148,15 @@ public async Task GetOrCreate()
119148
const string keyA = "BufferA";
120149

121150
// We create a buffer implicitly by setting its capacity.
122-
// This calls GetOrCreate(keyA, DefaultCapacity) which creates the buffer and sets capacity to 10.
123151
await grain.SetBufferCapacity(keyA, 10);
124152
Assert.Equal(10, await grain.GetBufferCapacity(keyA));
125153

126-
// Than we enqueue an item. This calls GetOrCreate(keyA, DefaultCapacity).
127-
// Because the buffer already exists, the capacity parameter should be ignored.
154+
// Than we enqueue an item. This calls GetBuffer(keyA).
155+
// Because the grain now remembers the capacity is 10, it will be used,
156+
// and the durable state's capacity will not be changed.
128157
await grain.Enqueue(keyA, "item1");
129158

130-
// The capacity should have NOT changed to the default.
159+
// The capacity should have NOT changed.
131160
Assert.Equal(10, await grain.GetBufferCapacity(keyA));
132161
Assert.Equal(1, await grain.GetBufferItemsCount(keyA));
133162

@@ -188,7 +217,9 @@ public async Task Persistence()
188217
await grain.Enqueue(KeyB, "b1");
189218
await grain.RemoveBuffer(KeyC);
190219

220+
var capacities1 = await grain.GetAllCapacities();
191221
await DeactivateGrain(grain);
222+
await grain.SetAllCapacities(capacities1);
192223

193224
Assert.Equal(2, await grain.GetBuffersCount());
194225
Assert.True(await grain.ContainsBuffer(KeyA));
@@ -203,7 +234,9 @@ public async Task Persistence()
203234
await grain.TryDequeue(KeyA); // Should only remove item "a1" not "BufferA"
204235
await grain.RemoveBuffer(KeyB);
205236

237+
var capacities2 = await grain.GetAllCapacities();
206238
await DeactivateGrain(grain);
239+
await grain.SetAllCapacities(capacities2);
207240

208241
Assert.Equal(1, await grain.GetBuffersCount());
209242
Assert.True(await grain.ContainsBuffer(KeyA));
@@ -228,7 +261,9 @@ public async Task Restore()
228261
await grain.Enqueue(key, $"item-{i}-2");
229262
}
230263

264+
var capacities = await grain.GetAllCapacities();
231265
await DeactivateGrain(grain); // To trigger a restore from the snapshot
266+
await grain.SetAllCapacities(capacities);
232267

233268
Assert.Equal(numBuffers, await grain.GetBuffersCount());
234269

DurableStateMachines.Tests/DurableRingBufferTests.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ public async Task BasicOperations()
103103
{
104104
var grain = GetGrain("basic");
105105

106+
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(() => grain.SetCapacity(-1));
107+
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(() => grain.SetCapacity(0));
108+
106109
await grain.SetCapacity(3);
107110

108111
await grain.Enqueue("one");

DurableStateMachines/DurableRingBufferCollection.cs

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,23 @@ public interface IDurableRingBufferCollection<TKey, TValue> where TKey : notnull
2525
IReadOnlyCollection<TKey> Keys { get; }
2626

2727
/// <summary>
28-
/// Gets or creates the ring buffer associated with the specified key.
28+
/// Ensures that a ring buffer associated with the specified key exists and is configured with the given capacity.
2929
/// </summary>
30-
/// <param name="key">The key of the ring buffer to get or create.</param>
31-
/// <param name="capacity">The capacity to use when creating a new buffer, if it does not exist.</param>
32-
/// <returns>A proxy to the ring buffer that ensures all operations are durable.</returns>
33-
IDurableRingBuffer<TValue> GetOrCreate(TKey key, int capacity);
30+
/// <param name="key">The key of the ring buffer to ensure.</param>
31+
/// <param name="capacity">The desired capacity for the ring buffer.</param>
32+
/// <returns>A durable proxy to the ring buffer, which will have the specified capacity after this call.</returns>
33+
/// <remarks>
34+
/// <para>
35+
/// This method provides a convenient way to get a buffer and set its capacity in a single, atomic operation.
36+
/// If a buffer for the given <paramref name="key"/> does not exist, it will be created with the specified <paramref name="capacity"/>.
37+
/// If the buffer already exists, its capacity will be overwritten with the new value (if its different).
38+
/// </para>
39+
/// <para><strong>
40+
/// Decreasing the capacity on an existing buffer may result in data loss, if the number of items
41+
/// currently in the buffer exceeds the new capacity.
42+
/// </strong></para>
43+
/// </remarks>
44+
IDurableRingBuffer<TValue> EnsureBuffer(TKey key, int capacity);
3445

3546
/// <summary>
3647
/// Determines whether the collection contains a ring buffer with the specified key.
@@ -84,11 +95,12 @@ public DurableRingBufferCollection(
8495
public int Count => _proxies.Count;
8596
public IReadOnlyCollection<TKey> Keys => _proxies.Keys;
8697

87-
public IDurableRingBuffer<TValue> GetOrCreate(TKey key, int capacity)
98+
public IDurableRingBuffer<TValue> EnsureBuffer(TKey key, int capacity)
8899
{
89-
var proxy = GetOrCreateProxy(key, out var created);
100+
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(capacity, nameof(capacity));
90101

91-
if (created)
102+
var proxy = GetOrCreateProxy(key);
103+
if (proxy.Capacity != capacity)
92104
{
93105
proxy.SetCapacity(capacity);
94106
}
@@ -208,7 +220,7 @@ void ApplySnapshot(ref Reader<ReadOnlySequenceInput> reader)
208220
var capacity = (int)reader.ReadVarUInt32();
209221
var itemCount = (int)reader.ReadVarUInt32();
210222

211-
var buffer = GetOrCreateProxy(key, out _).Buffer;
223+
var buffer = GetOrCreateProxy(key).Buffer;
212224
buffer.SetCapacity(capacity);
213225

214226
for (var j = 0; j < itemCount; j++)
@@ -351,23 +363,18 @@ private void ClearBuffer(TKey key)
351363
}
352364

353365
private bool ApplyRemove(TKey key) => _proxies.Remove(key, out _);
354-
private bool ApplySetBufferCapacity(TKey key, int capacity) => GetOrCreateProxy(key, out _).Buffer.SetCapacity(capacity);
355-
private void ApplyEnqueueBufferItem(TKey key, TValue item) => GetOrCreateProxy(key, out _).Buffer.Enqueue(item);
356-
private bool ApplyTryDequeueBufferItem(TKey key, out TValue item) => GetOrCreateProxy(key, out _).Buffer.TryDequeue(out item!);
357-
private bool ApplyClearBuffer(TKey key) => GetOrCreateProxy(key, out _).Buffer.Clear();
366+
private bool ApplySetBufferCapacity(TKey key, int capacity) => GetOrCreateProxy(key).Buffer.SetCapacity(capacity);
367+
private void ApplyEnqueueBufferItem(TKey key, TValue item) => GetOrCreateProxy(key).Buffer.Enqueue(item);
368+
private bool ApplyTryDequeueBufferItem(TKey key, out TValue item) => GetOrCreateProxy(key).Buffer.TryDequeue(out item!);
369+
private bool ApplyClearBuffer(TKey key) => GetOrCreateProxy(key).Buffer.Clear();
358370
private void ApplyClear() => _proxies.Clear();
359371

360-
internal RingBufferProxy GetOrCreateProxy(TKey key, out bool created)
372+
internal RingBufferProxy GetOrCreateProxy(TKey key)
361373
{
362-
created = false;
363-
364374
if (!_proxies.TryGetValue(key, out var proxy))
365375
{
366376
proxy = new RingBufferProxy(key, this);
367-
368377
_proxies[key] = proxy;
369-
370-
created = true;
371378
}
372379

373380
return proxy;
@@ -448,7 +455,7 @@ public KeyValuePair<TKey, TValue[]>[] Items
448455

449456
foreach (var key in collection.Keys)
450457
{
451-
var proxy = collection.GetOrCreateProxy(key, out _);
458+
var proxy = collection.GetOrCreateProxy(key);
452459
result[i++] = new KeyValuePair<TKey, TValue[]>(key, [.. proxy]);
453460
}
454461

0 commit comments

Comments
 (0)