Skip to content

Commit 751686f

Browse files
committed
propose support for XREADGROUP CLAIM
1 parent 9c6023f commit 751686f

File tree

9 files changed

+123
-19
lines changed

9 files changed

+123
-19
lines changed

src/StackExchange.Redis/APITypes/StreamEntry.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,19 @@ public StreamEntry(RedisValue id, NameValueEntry[] values)
1414
{
1515
Id = id;
1616
Values = values;
17+
IdleTime = null;
18+
DeliveryCount = 0;
19+
}
20+
21+
/// <summary>
22+
/// Creates an stream entry.
23+
/// </summary>
24+
public StreamEntry(RedisValue id, NameValueEntry[] values, TimeSpan? idleTime, int deliveryCount)
25+
{
26+
Id = id;
27+
Values = values;
28+
IdleTime = idleTime;
29+
DeliveryCount = deliveryCount;
1730
}
1831

1932
/// <summary>
@@ -51,6 +64,18 @@ public RedisValue this[RedisValue fieldName]
5164
}
5265
}
5366

67+
/// <summary>
68+
/// Delivery count - the number of times this entry has been delivered: 0 for new messages that haven't been delivered before,
69+
/// 1+ for claimed messages (previously unacknowledged entries).
70+
/// </summary>
71+
public int DeliveryCount { get; }
72+
73+
/// <summary>
74+
/// Idle time in milliseconds - the number of milliseconds elapsed since this entry was last delivered to a consumer.
75+
/// </summary>
76+
/// <remarks>This member is populated when using <c>XREADGROUP</c> with <c>CLAIM</c>.</remarks>
77+
public TimeSpan? IdleTime { get; }
78+
5479
/// <summary>
5580
/// Indicates that the Redis Stream Entry is null.
5681
/// </summary>

src/StackExchange.Redis/Interfaces/IDatabase.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3004,7 +3004,25 @@ IEnumerable<SortedSetEntry> SortedSetScan(
30043004
/// <para>Equivalent of calling <c>XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</c>.</para>
30053005
/// <para><seealso href="https://redis.io/commands/xreadgroup"/></para>
30063006
/// </remarks>
3007-
RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
3007+
RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, CommandFlags flags);
3008+
3009+
/// <summary>
3010+
/// Read from multiple streams into the given consumer group.
3011+
/// The consumer group with the given <paramref name="groupName"/> will need to have been created for each stream prior to calling this method.
3012+
/// </summary>
3013+
/// <param name="streamPositions">Array of streams and the positions from which to begin reading for each stream.</param>
3014+
/// <param name="groupName">The name of the consumer group.</param>
3015+
/// <param name="consumerName">The name of the consumer.</param>
3016+
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
3017+
/// <param name="noAck">When true, the message will not be added to the pending message list.</param>
3018+
/// <param name="claimMinIdleTime">Auto-claim messages that have been idle for at least this long.</param>
3019+
/// <param name="flags">The flags to use for this operation.</param>
3020+
/// <returns>A value of <see cref="RedisStream"/> for each stream.</returns>
3021+
/// <remarks>
3022+
/// <para>Equivalent of calling <c>XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</c>.</para>
3023+
/// <para><seealso href="https://redis.io/commands/xreadgroup"/></para>
3024+
/// </remarks>
3025+
RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None);
30083026

30093027
/// <summary>
30103028
/// Trim the stream to a specified maximum length.

src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,10 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
731731
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags);
732732

733733
/// <inheritdoc cref="IDatabase.StreamReadGroup(StreamPosition[], RedisValue, RedisValue, int?, bool, CommandFlags)"/>
734-
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
734+
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, CommandFlags flags);
735+
736+
/// <inheritdoc cref="IDatabase.StreamReadGroup(StreamPosition[], RedisValue, RedisValue, int?, bool, TimeSpan?, CommandFlags)"/>
737+
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None);
735738

736739
/// <inheritdoc cref="IDatabase.StreamTrim(RedisKey, int, bool, CommandFlags)"/>
737740
Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags);

src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,9 @@ public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions
696696
public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None) =>
697697
Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, noAck, flags);
698698

699+
public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) =>
700+
Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, noAck, claimMinIdleTime, flags);
701+
699702
public Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
700703
Inner.StreamTrimAsync(ToInner(key), maxLength, useApproximateMaxLength, flags);
701704

src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,9 @@ public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValu
678678
public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None) =>
679679
Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, noAck, flags);
680680

681+
public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) =>
682+
Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, noAck, claimMinIdleTime, flags);
683+
681684
public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
682685
Inner.StreamTrim(ToInner(key), maxLength, useApproximateMaxLength, flags);
683686

src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,6 @@ StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.RedisKey key, Stack
746746
StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]!
747747
StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position = null, int? count = null, bool noAck = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]!
748748
StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position, int? count, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.StreamEntry[]!
749-
StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream = null, bool noAck = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]!
750749
StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.RedisStream[]!
751750
StackExchange.Redis.IDatabase.StreamTrim(StackExchange.Redis.RedisKey key, int maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> long
752751
StackExchange.Redis.IDatabase.StreamTrim(StackExchange.Redis.RedisKey key, long maxLength, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode mode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
@@ -991,7 +990,6 @@ StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.RedisKey
991990
StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisStream[]!>!
992991
StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position = null, int? count = null, bool noAck = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
993992
StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position, int? count, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
994-
StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream = null, bool noAck = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisStream[]!>!
995993
StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisStream[]!>!
996994
StackExchange.Redis.IDatabaseAsync.StreamTrimAsync(StackExchange.Redis.RedisKey key, int maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task<long>!
997995
StackExchange.Redis.IDatabaseAsync.StreamTrimAsync(StackExchange.Redis.RedisKey key, long maxLength, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode mode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
@@ -2052,3 +2050,10 @@ StackExchange.Redis.IServer.ExecuteAsync(int? database, string! command, System.
20522050
[SER001]static StackExchange.Redis.VectorSetSimilaritySearchRequest.ByMember(StackExchange.Redis.RedisValue member) -> StackExchange.Redis.VectorSetSimilaritySearchRequest!
20532051
[SER001]static StackExchange.Redis.VectorSetSimilaritySearchRequest.ByVector(System.ReadOnlyMemory<float> vector) -> StackExchange.Redis.VectorSetSimilaritySearchRequest!
20542052
StackExchange.Redis.RedisChannel.WithKeyRouting() -> StackExchange.Redis.RedisChannel
2053+
StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream = null, bool noAck = false, System.TimeSpan? claimMinIdleTime = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]!
2054+
StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream, bool noAck, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.RedisStream[]!
2055+
StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream = null, bool noAck = false, System.TimeSpan? claimMinIdleTime = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisStream[]!>!
2056+
StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream, bool noAck, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisStream[]!>!
2057+
StackExchange.Redis.StreamEntry.DeliveryCount.get -> int
2058+
StackExchange.Redis.StreamEntry.IdleTime.get -> System.TimeSpan?
2059+
StackExchange.Redis.StreamEntry.StreamEntry(StackExchange.Redis.RedisValue id, StackExchange.Redis.NameValueEntry[]! values, System.TimeSpan? idleTime, int deliveryCount) -> void

0 commit comments

Comments
 (0)