Skip to content

Commit 905d945

Browse files
Added Contains(value) for durable ring and time-window buffers
1 parent ae2a85a commit 905d945

File tree

6 files changed

+143
-0
lines changed

6 files changed

+143
-0
lines changed

DurableStateMachines.Tests/DurableRingBufferTests.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public interface IDurableRingBufferGrain : IGrainWithStringKey
1515
Task<int> GetCount();
1616
Task<bool> IsEmpty();
1717
Task<bool> IsFull();
18+
Task<bool> Contains(string item);
1819
Task<List<string>> GetAll();
1920
Task<(int, string[])> CopyToArray(int arraySize, int arrayIndex);
2021
Task<(int, string[])> DrainToArray(int arraySize, int arrayIndex);
@@ -57,6 +58,7 @@ public async Task Clear()
5758
public Task<int> GetCount() => Task.FromResult(state.Count);
5859
public Task<bool> IsEmpty() => Task.FromResult(state.IsEmpty);
5960
public Task<bool> IsFull() => Task.FromResult(state.IsFull);
61+
public Task<bool> Contains(string item) => Task.FromResult(state.Contains(item));
6062
public Task<List<string>> GetAll() => Task.FromResult(state.ToList());
6163

6264
public Task<(int, string[])> CopyToArray(int arraySize, int arrayIndex)
@@ -128,6 +130,33 @@ public async Task BasicOperations()
128130
Assert.Equal("three", items2.Last());
129131
}
130132

133+
[Fact]
134+
public async Task Contains()
135+
{
136+
var grain = GetGrain("contains");
137+
await grain.SetCapacity(3);
138+
139+
Assert.False(await grain.Contains("one"));
140+
141+
await grain.Enqueue("one");
142+
await grain.Enqueue("two");
143+
144+
Assert.True(await grain.Contains("one"));
145+
Assert.True(await grain.Contains("two"));
146+
Assert.False(await grain.Contains("three"));
147+
148+
await grain.Enqueue("three");
149+
await grain.Enqueue("four");
150+
151+
Assert.False(await grain.Contains("one"));
152+
Assert.True(await grain.Contains("four"));
153+
154+
await grain.TryDequeue();
155+
156+
Assert.False(await grain.Contains("two"));
157+
Assert.True(await grain.Contains("three"));
158+
}
159+
131160
[Fact]
132161
public async Task TryDequeueRemovesOldestItem()
133162
{

DurableStateMachines.Tests/DurableTimeWindowBufferTests.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public interface IDurableTimeWindowBufferGrain : IGrainWithStringKey
1414
Task<TimeSpan> GetWindow();
1515
Task<int> GetCount();
1616
Task<bool> IsEmpty();
17+
Task<bool> Contains(string value);
1718
Task<List<string>> GetAll();
1819
Task<(int, string[])> CopyToArray(int arraySize, int arrayIndex);
1920
Task<(int, string[])> DrainToArray(int arraySize, int arrayIndex);
@@ -55,6 +56,7 @@ public async Task Clear()
5556
public Task<TimeSpan> GetWindow() => Task.FromResult(state.Window);
5657
public Task<int> GetCount() => Task.FromResult(state.Count);
5758
public Task<bool> IsEmpty() => Task.FromResult(state.IsEmpty);
59+
public Task<bool> Contains(string value) => Task.FromResult(state.Contains(value));
5860
public Task<List<string>> GetAll() => Task.FromResult(state.ToList());
5961

6062
public Task<(int, string[])> CopyToArray(int arraySize, int arrayIndex)
@@ -117,6 +119,35 @@ public async Task BasicOperations()
117119
Assert.Equal("two", items1.Last());
118120
}
119121

122+
[Fact]
123+
public async Task Contains()
124+
{
125+
var grain = GetGrain("contains");
126+
await grain.SetWindow(TimeSpan.FromSeconds(10));
127+
128+
Assert.False(await grain.Contains("one"));
129+
130+
await grain.Enqueue("one"); // t=0
131+
fixture.TimeProvider.Advance(TimeSpan.FromSeconds(5));
132+
await grain.Enqueue("two"); // t=5
133+
134+
Assert.True(await grain.Contains("one"));
135+
Assert.True(await grain.Contains("two"));
136+
Assert.False(await grain.Contains("three"));
137+
138+
// Test after item expiration
139+
fixture.TimeProvider.Advance(TimeSpan.FromSeconds(6)); // Time is now t=11. "one" is 11s old and expired.
140+
await grain.Enqueue("three"); // Enqueue triggers purge. "one" is removed.
141+
142+
Assert.False(await grain.Contains("one"));
143+
Assert.True(await grain.Contains("two"));
144+
Assert.True(await grain.Contains("three"));
145+
146+
await grain.TryDequeue();
147+
Assert.False(await grain.Contains("two"));
148+
Assert.True(await grain.Contains("three"));
149+
}
150+
120151
[Fact]
121152
public async Task TryDequeueRemovesOldestItem()
122153
{

DurableStateMachines/DurableRingBuffer.cs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,16 @@ public interface IDurableRingBuffer<T> : IEnumerable<T>, IReadOnlyCollection<T>
5454
/// <returns><c>true</c> if an element was removed and returned from the buffer successfully; otherwise, <c>false</c>.</returns>
5555
bool TryDequeue([MaybeNullWhen(false)] out T result);
5656

57+
/// <summary>
58+
/// Determines whether the buffer contains a specific value.
59+
/// <para>
60+
/// The comparison is performed using <see cref="EqualityComparer{T}.Default"/>.
61+
/// </para>
62+
/// </summary>
63+
/// <param name="item">The object to locate in the buffer. The value can be <c>null</c> for reference types.</param>
64+
/// <returns><c>true</c> if <paramref name="item"/> is found in the buffer; otherwise, <c>false</c>.</returns>
65+
bool Contains(T item);
66+
5767
/// <summary>
5868
/// Copies the elements of the buffer to an array, starting at a particular array index.
5969
/// The elements are copied in their logical order (from oldest to newest).
@@ -292,6 +302,7 @@ public void Clear()
292302
}
293303
}
294304

305+
public bool Contains(T item) => _buffer.Contains(item);
295306
public int CopyTo(T[] array, int arrayIndex) => _buffer.CopyTo(array, arrayIndex);
296307
public int CopyTo(Span<T> destination) => _buffer.CopyTo(destination);
297308

@@ -356,6 +367,50 @@ internal sealed class RingBuffer<T> : IEnumerable<T>
356367
public bool IsEmpty => _count == 0;
357368
public bool IsFull => _count == Capacity;
358369

370+
public bool Contains(T item)
371+
{
372+
if (IsEmpty)
373+
{
374+
return false;
375+
}
376+
377+
var span = new ReadOnlySpan<T>(_buffer);
378+
379+
// Case 1: The items are in a single contiguous block: [_, _, T, T, H, _]
380+
if (_tail < _head)
381+
{
382+
return Exists(span, _tail, _count, item);
383+
}
384+
385+
// Case 2: The items are wrapped around the end of the buffer: [T, T, H, _, T]
386+
// This also handles the case where the buffer is full (_head == _tail).
387+
388+
// We search the first segment (from the tail to the end of the buffer).
389+
if (Exists(span, _tail, Capacity - _tail, item))
390+
{
391+
return true;
392+
}
393+
394+
// Otherwise, we search the second segment (from the start of the buffer up to the head).
395+
return Exists(span, 0, _head, item);
396+
397+
static bool Exists(ReadOnlySpan<T> buffer, int index, int segmentLength, T item)
398+
{
399+
var comparer = EqualityComparer<T>.Default;
400+
var end = index + segmentLength;
401+
402+
for (int i = index; i < end; i++)
403+
{
404+
if (comparer.Equals(buffer[i], item))
405+
{
406+
return true;
407+
}
408+
}
409+
410+
return false;
411+
}
412+
}
413+
359414
public void Enqueue(T item)
360415
{
361416
_buffer[_head] = item;

DurableStateMachines/DurableRingBufferCollection.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,7 @@ internal sealed class RingBufferProxy(TKey key,
408408
public bool IsEmpty => Buffer.IsEmpty;
409409
public bool IsFull => Buffer.IsFull;
410410

411+
public bool Contains(TValue item) => Buffer.Contains(item);
411412
public bool SetCapacity(int capacity) => collection.SetBufferCapacity(key, capacity);
412413
public void Enqueue(TValue item) => collection.EnqueueItem(key, item);
413414
public bool TryDequeue([MaybeNullWhen(false)] out TValue item) => collection.TryDequeueItem(key, out item);

DurableStateMachines/DurableTimeWindowBuffer.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ public interface IDurableTimeWindowBuffer<T> : IEnumerable<T>, IReadOnlyCollecti
4949
/// <returns><c>true</c> if an element was removed and returned from the buffer successfully; otherwise, <c>false</c>.</returns>
5050
bool TryDequeue([MaybeNullWhen(false)] out T result);
5151

52+
/// <summary>
53+
/// Determines whether the buffer contains a specific value.
54+
/// <para>
55+
/// The comparison is performed using <see cref="EqualityComparer{T}.Default"/>.
56+
/// </para>
57+
/// </summary>
58+
/// <param name="item">The object to locate in the buffer. The value can be <c>null</c> for reference types.</param>
59+
/// <returns><c>true</c> if <paramref name="item"/> is found in the buffer; otherwise, <c>false</c>.</returns>
60+
bool Contains(T item);
61+
5262
/// <summary>
5363
/// Copies the elements of the buffer to an array, starting at a particular array index.
5464
/// The elements are copied in their logical order (from oldest to newest).
@@ -297,6 +307,7 @@ public void Clear()
297307
}
298308
}
299309

310+
public bool Contains(T item) => _buffer.Contains(item);
300311
public int CopyTo(T[] array, int arrayIndex) => _buffer.CopyTo(array, arrayIndex);
301312
public int CopyTo(Span<T> destination) => _buffer.CopyTo(destination);
302313

@@ -365,6 +376,21 @@ public static void ThrowIfTooShortWindow(long windowSeconds)
365376
}
366377
}
367378

379+
public bool Contains(T item)
380+
{
381+
var comparer = EqualityComparer<T>.Default;
382+
383+
foreach (var (current, _) in _buffer)
384+
{
385+
if (comparer.Equals(current, item))
386+
{
387+
return true;
388+
}
389+
}
390+
391+
return false;
392+
}
393+
368394
public void Enqueue(T item, long timestamp)
369395
{
370396
_buffer.Enqueue((item, timestamp));

DurableStateMachines/DurableTimeWindowBufferCollection.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,7 @@ internal sealed class TimeWindowBufferProxy(TKey key,
412412
public bool IsEmpty => Buffer.IsEmpty;
413413
public TimeSpan Window => TimeSpan.FromSeconds(Buffer.WindowSeconds);
414414

415+
public bool Contains(TValue item) => Buffer.Contains(item);
415416
public bool SetWindow(TimeSpan window) => collection.SetBufferWindow(key, (long)window.TotalSeconds);
416417
public void Enqueue(TValue item) => collection.EnqueueItem(key, item);
417418
public bool TryDequeue([MaybeNullWhen(false)] out TValue item) => collection.TryDequeueItem(key, out item);

0 commit comments

Comments
 (0)