@@ -495,8 +495,8 @@ public async Task StreamConsumerGroupSetId()
495495
496496 var db = conn . GetDatabase ( ) ;
497497 var key = Me ( ) ;
498- const string groupName = "test_group" ,
499- consumer = "consumer" ;
498+ await db . KeyDeleteAsync ( key , CommandFlags . FireAndForget ) ;
499+ const string groupName = "test_group" , consumer = "consumer" ;
500500
501501 // Create a stream
502502 db . StreamAdd ( key , "field1" , "value1" ) ;
@@ -519,6 +519,56 @@ public async Task StreamConsumerGroupSetId()
519519 Assert . Equal ( 2 , secondRead . Length ) ;
520520 }
521521
522+ [ Fact ]
523+ public async Task StreamConsumerGroupAutoClaim ( )
524+ {
525+ await using var conn = Create ( require : RedisFeatures . v8_4_0_rc1 ) ;
526+
527+ var db = conn . GetDatabase ( ) ;
528+ var key = Me ( ) ;
529+ await db . KeyDeleteAsync ( key , CommandFlags . FireAndForget ) ;
530+ const string groupName = "test_group" , consumer = "consumer" ;
531+
532+ // Create a group and set the position to deliver new messages only.
533+ await db . StreamCreateConsumerGroupAsync ( key , groupName , StreamPosition . NewMessages ) ;
534+
535+ // add some entries
536+ await db . StreamAddAsync ( key , "field1" , "value1" ) ;
537+ await db . StreamAddAsync ( key , "field2" , "value2" ) ;
538+
539+ var idleTime = TimeSpan . FromMilliseconds ( 100 ) ;
540+ // Read into the group, expect the two entries; we don't expect any data
541+ // here, at least on a fast server, because it hasn't been idle long enough.
542+ StreamPosition [ ] positions = [ new ( key , StreamPosition . NewMessages ) ] ;
543+ var groups = await db . StreamReadGroupAsync ( positions , groupName , consumer , noAck : false , countPerStream : 10 , claimMinIdleTime : idleTime ) ;
544+ var grp = Assert . Single ( groups ) ;
545+ Assert . Equal ( key , grp . Key ) ;
546+ Assert . Equal ( 2 , grp . Entries . Length ) ;
547+ foreach ( var entry in grp . Entries )
548+ {
549+ Assert . Equal ( 0 , entry . DeliveryCount ) ; // never delivered before
550+ Assert . Equal ( TimeSpan . Zero , entry . IdleTime ) ; // never delivered before
551+ }
552+
553+ // now repeat immediately; we didn't "ack", so they're still pending, but not idle long enough
554+ groups = await db . StreamReadGroupAsync ( positions , groupName , consumer , noAck : false , countPerStream : 10 , claimMinIdleTime : idleTime ) ;
555+ Assert . Empty ( groups ) ; // nothing available from any group
556+
557+ // wait long enough for the messages to be considered idle
558+ await Task . Delay ( idleTime + idleTime ) ;
559+
560+ // repeat again; we should get the entries
561+ groups = await db . StreamReadGroupAsync ( positions , groupName , consumer , noAck : false , countPerStream : 10 , claimMinIdleTime : idleTime ) ;
562+ grp = Assert . Single ( groups ) ;
563+ Assert . Equal ( key , grp . Key ) ;
564+ Assert . Equal ( 2 , grp . Entries . Length ) ;
565+ foreach ( var entry in grp . Entries )
566+ {
567+ Assert . Equal ( 1 , entry . DeliveryCount ) ; // this is a redelivery
568+ Assert . True ( entry . IdleTime > TimeSpan . Zero ) ; // and is considered idle
569+ }
570+ }
571+
522572 [ Fact ]
523573 public async Task StreamConsumerGroupWithNoConsumers ( )
524574 {
0 commit comments