Skip to content

Commit c118744

Browse files
committed
support claimMinIdleTime on single-stream read API
1 parent 3b27436 commit c118744

File tree

8 files changed

+121
-22
lines changed

8 files changed

+121
-22
lines changed

src/StackExchange.Redis/Interfaces/IDatabase.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2971,7 +2971,22 @@ IEnumerable<SortedSetEntry> SortedSetScan(
29712971
/// <param name="flags">The flags to use for this operation.</param>
29722972
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
29732973
/// <remarks><seealso href="https://redis.io/commands/xreadgroup"/></remarks>
2974-
StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
2974+
StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position, int? count, bool noAck, CommandFlags flags);
2975+
2976+
/// <summary>
2977+
/// Read messages from a stream into an associated consumer group.
2978+
/// </summary>
2979+
/// <param name="key">The key of the stream.</param>
2980+
/// <param name="groupName">The name of the consumer group.</param>
2981+
/// <param name="consumerName">The consumer name.</param>
2982+
/// <param name="position">The position from which to read the stream. Defaults to <see cref="StreamPosition.NewMessages"/> when <see langword="null"/>.</param>
2983+
/// <param name="count">The maximum number of messages to return.</param>
2984+
/// <param name="noAck">When true, the message will not be added to the pending message list.</param>
2985+
/// <param name="claimMinIdleTime">Auto-claim messages that have been idle for at least this long.</param>
2986+
/// <param name="flags">The flags to use for this operation.</param>
2987+
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
2988+
/// <remarks><seealso href="https://redis.io/commands/xreadgroup"/></remarks>
2989+
StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None);
29752990

29762991
/// <summary>
29772992
/// Read from multiple streams into the given consumer group.

src/StackExchange.Redis/Interfaces/IDatabaseAsync.VectorSets.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,7 @@ Task<bool> VectorSetSetAttributesJsonAsync(
9292
RedisKey key,
9393
VectorSetSimilaritySearchRequest query,
9494
CommandFlags flags = CommandFlags.None);
95+
96+
/// <inheritdoc cref="IDatabase.StreamReadGroup(StreamPosition[], RedisValue, RedisValue, int?, bool, TimeSpan?, CommandFlags)"/>
97+
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None);
9598
}

src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -725,17 +725,17 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
725725
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position, int? count, CommandFlags flags);
726726

727727
/// <inheritdoc cref="IDatabase.StreamReadGroup(RedisKey, RedisValue, RedisValue, RedisValue?, int?, bool, CommandFlags)"/>
728-
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
728+
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position, int? count, bool noAck, CommandFlags flags);
729+
730+
/// <inheritdoc cref="IDatabase.StreamReadGroup(RedisKey, RedisValue, RedisValue, RedisValue?, int?, bool, TimeSpan?, CommandFlags)"/>
731+
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None);
729732

730733
/// <inheritdoc cref="IDatabase.StreamReadGroup(StreamPosition[], RedisValue, RedisValue, int?, CommandFlags)"/>
731734
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags);
732735

733736
/// <inheritdoc cref="IDatabase.StreamReadGroup(StreamPosition[], RedisValue, RedisValue, int?, bool, CommandFlags)"/>
734737
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, CommandFlags flags);
735738

736-
/// <inheritdoc cref="IDatabase.StreamReadGroup(StreamPosition[], RedisValue, RedisValue, int?, bool, TimeSpan?, CommandFlags)"/>
737-
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None);
738-
739739
/// <inheritdoc cref="IDatabase.StreamTrim(RedisKey, int, bool, CommandFlags)"/>
740740
Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags);
741741

src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,9 @@ public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupNa
690690
public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None) =>
691691
Inner.StreamReadGroupAsync(ToInner(key), groupName, consumerName, position, count, noAck, flags);
692692

693+
public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) =>
694+
Inner.StreamReadGroupAsync(ToInner(key), groupName, consumerName, position, count, noAck, claimMinIdleTime, flags);
695+
693696
public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags) =>
694697
Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, flags);
695698

src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,9 @@ public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisVa
672672
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None) =>
673673
Inner.StreamReadGroup(ToInner(key), groupName, consumerName, position, count, noAck, flags);
674674

675+
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) =>
676+
Inner.StreamReadGroup(ToInner(key), groupName, consumerName, position, count, noAck, claimMinIdleTime, flags);
677+
675678
public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags) =>
676679
Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, flags);
677680

src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -744,7 +744,8 @@ StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey
744744
StackExchange.Redis.IDatabase.StreamRange(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]!
745745
StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]!
746746
StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]!
747-
StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position = null, int? count = null, bool noAck = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]!
747+
StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position, int? count, bool noAck, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.StreamEntry[]!
748+
StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position = null, int? count = null, bool noAck = false, System.TimeSpan? claimMinIdleTime = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]!
748749
StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position, int? count, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.StreamEntry[]!
749750
StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.RedisStream[]!
750751
StackExchange.Redis.IDatabase.StreamTrim(StackExchange.Redis.RedisKey key, int maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> long
@@ -988,7 +989,8 @@ StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redi
988989
StackExchange.Redis.IDatabaseAsync.StreamRangeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
989990
StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
990991
StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisStream[]!>!
991-
StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position = null, int? count = null, bool noAck = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
992+
StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position, int? count, bool noAck, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
993+
StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position = null, int? count = null, bool noAck = false, System.TimeSpan? claimMinIdleTime = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
992994
StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position, int? count, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
993995
StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisStream[]!>!
994996
StackExchange.Redis.IDatabaseAsync.StreamTrimAsync(StackExchange.Redis.RedisKey key, int maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task<long>!

src/StackExchange.Redis/RedisDatabase.cs

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3262,29 +3262,38 @@ public Task<RedisStream[]> StreamReadAsync(StreamPosition[] streamPositions, int
32623262
return ExecuteAsync(msg, ResultProcessor.MultiStream, defaultValue: Array.Empty<RedisStream>());
32633263
}
32643264

3265-
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position, int? count, CommandFlags flags)
3266-
{
3267-
return StreamReadGroup(
3265+
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position, int? count, CommandFlags flags) =>
3266+
StreamReadGroup(
32683267
key,
32693268
groupName,
32703269
consumerName,
32713270
position,
32723271
count,
32733272
false,
3273+
null,
32743274
flags);
3275-
}
32763275

32773276
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None)
3278-
{
3279-
var actualPosition = position ?? StreamPosition.NewMessages;
3277+
=> StreamReadGroup(
3278+
key,
3279+
groupName,
3280+
consumerName,
3281+
position,
3282+
count,
3283+
noAck,
3284+
null,
3285+
flags);
32803286

3287+
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None)
3288+
{
32813289
var msg = GetStreamReadGroupMessage(
32823290
key,
32833291
groupName,
32843292
consumerName,
3285-
StreamPosition.Resolve(actualPosition, RedisCommand.XREADGROUP),
3293+
StreamPosition.Resolve(position ?? StreamPosition.NewMessages, RedisCommand.XREADGROUP),
32863294
count,
32873295
noAck,
3296+
claimMinIdleTime,
32883297
flags);
32893298

32903299
return ExecuteSync(msg, ResultProcessor.SingleStreamWithNameSkip, defaultValue: Array.Empty<StreamEntry>());
@@ -3299,20 +3308,31 @@ public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupNa
32993308
position,
33003309
count,
33013310
false,
3311+
null,
33023312
flags);
33033313
}
33043314

33053315
public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None)
3306-
{
3307-
var actualPosition = position ?? StreamPosition.NewMessages;
3316+
=> StreamReadGroupAsync(
3317+
key,
3318+
groupName,
3319+
consumerName,
3320+
position,
3321+
count,
3322+
noAck,
3323+
null,
3324+
flags);
33083325

3326+
public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None)
3327+
{
33093328
var msg = GetStreamReadGroupMessage(
33103329
key,
33113330
groupName,
33123331
consumerName,
3313-
StreamPosition.Resolve(actualPosition, RedisCommand.XREADGROUP),
3332+
StreamPosition.Resolve(position ?? StreamPosition.NewMessages, RedisCommand.XREADGROUP),
33143333
count,
33153334
noAck,
3335+
claimMinIdleTime,
33163336
flags);
33173337

33183338
return ExecuteAsync(msg, ResultProcessor.SingleStreamWithNameSkip, defaultValue: Array.Empty<StreamEntry>());
@@ -4838,8 +4858,8 @@ private Message GetStreamRangeMessage(RedisKey key, RedisValue? minId, RedisValu
48384858
values);
48394859
}
48404860

4841-
private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue afterId, int? count, bool noAck, CommandFlags flags) =>
4842-
new SingleStreamReadGroupCommandMessage(Database, flags, key, groupName, consumerName, afterId, count, noAck);
4861+
private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue afterId, int? count, bool noAck, TimeSpan? claimMinIdleTime, CommandFlags flags) =>
4862+
new SingleStreamReadGroupCommandMessage(Database, flags, key, groupName, consumerName, afterId, count, noAck, claimMinIdleTime);
48434863

48444864
private sealed class SingleStreamReadGroupCommandMessage : Message.CommandKeyBase // XREADGROUP with single stream. eg XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
48454865
{
@@ -4849,8 +4869,9 @@ private sealed class SingleStreamReadGroupCommandMessage : Message.CommandKeyBas
48494869
private readonly int? count;
48504870
private readonly bool noAck;
48514871
private readonly int argCount;
4872+
private readonly TimeSpan? claimMinIdleTime;
48524873

4853-
public SingleStreamReadGroupCommandMessage(int db, CommandFlags flags, RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue afterId, int? count, bool noAck)
4874+
public SingleStreamReadGroupCommandMessage(int db, CommandFlags flags, RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue afterId, int? count, bool noAck, TimeSpan? claimMinIdleTime)
48544875
: base(db, flags, RedisCommand.XREADGROUP, key)
48554876
{
48564877
if (count.HasValue && count <= 0)
@@ -4867,7 +4888,8 @@ public SingleStreamReadGroupCommandMessage(int db, CommandFlags flags, RedisKey
48674888
this.afterId = afterId;
48684889
this.count = count;
48694890
this.noAck = noAck;
4870-
argCount = 6 + (count.HasValue ? 2 : 0) + (noAck ? 1 : 0);
4891+
this.claimMinIdleTime = claimMinIdleTime;
4892+
argCount = 6 + (count.HasValue ? 2 : 0) + (noAck ? 1 : 0) + (claimMinIdleTime.HasValue ? 2 : 0);
48714893
}
48724894

48734895
protected override void WriteImpl(PhysicalConnection physical)
@@ -4888,6 +4910,12 @@ protected override void WriteImpl(PhysicalConnection physical)
48884910
physical.WriteBulkString(StreamConstants.NoAck);
48894911
}
48904912

4913+
if (claimMinIdleTime.HasValue)
4914+
{
4915+
physical.WriteBulkString(StreamConstants.Claim);
4916+
physical.WriteBulkString(claimMinIdleTime.Value.TotalMilliseconds);
4917+
}
4918+
48914919
physical.WriteBulkString(StreamConstants.Streams);
48924920
physical.Write(Key);
48934921
physical.WriteBulkString(afterId);

0 commit comments

Comments
 (0)