Skip to content

Commit 73870ec

Browse files
committed
add XAUTOCLAIM command, added to Redis in 6.2
1 parent ae80708 commit 73870ec

File tree

3 files changed

+114
-1
lines changed

3 files changed

+114
-1
lines changed

lib/redis.rb

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3280,6 +3280,38 @@ def xclaim(key, group, consumer, min_idle_time, *ids, **opts)
32803280
synchronize { |client| client.call(args, &blk) }
32813281
end
32823282

3283+
# Transfers ownership of pending stream entries that match the specified criteria.
3284+
#
3285+
# @example Claim next pending message stuck > 5 minutes and mark as retry
3286+
# redis.xautoclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0')
3287+
# @example Claim 50 next pending messages stuck > 5 minutes and mark as retry
3288+
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0', count: 50)
3289+
# @example Claim next pending message stuck > 5 minutes and don't mark as retry
3290+
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0', justid: true)
3291+
# @example Claim next pending message after this id stuck > 5 minutes and mark as retry
3292+
# redis.xautoclaim('mystream', 'mygroup', 'consumer1', 3600000, '1641321233-0')
3293+
#
3294+
# @param key [String] the stream key
3295+
# @param group [String] the consumer group name
3296+
# @param consumer [String] the consumer name
3297+
# @param min_idle_time [Integer] the number of milliseconds
3298+
# @param start [String] entry id to start scanning from or 0-0 for everything
3299+
# @param count [Integer] number of messages to claim (default 1)
3300+
# @param justid [Boolean] whether to fetch just an array of entry ids or not.
3301+
# Does not increment retry count when true
3302+
#
3303+
# @return [Hash{String => Hash}] the entries successfully claimed
3304+
# @return [Array<String>] the entry ids successfully claimed if justid option is `true`
3305+
def xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false)
3306+
args = [:xautoclaim, key, group, consumer, min_idle_time, start]
3307+
if count
3308+
args << 'COUNT' << count.to_s
3309+
end
3310+
args << 'JUSTID' if justid
3311+
blk = justid ? HashifyStreamAutoclaimJustId : HashifyStreamAutoclaim
3312+
synchronize { |client| client.call(args, &blk) }
3313+
end
3314+
32833315
# Fetches not acknowledging pending entries
32843316
#
32853317
# @example With key and group
@@ -3490,6 +3522,20 @@ def method_missing(command, *args) # rubocop:disable Style/MissingRespondToMissi
34903522
end
34913523
}
34923524

3525+
HashifyStreamAutoclaim = lambda { |reply|
3526+
{
3527+
'next' => reply[0],
3528+
'entries' => reply[1].map { |entry| [entry[0], entry[1].each_slice(2).to_h] }
3529+
}
3530+
}
3531+
3532+
HashifyStreamAutoclaimJustId = lambda { |reply|
3533+
{
3534+
'next' => reply[0],
3535+
'entries' => reply[1]
3536+
}
3537+
}
3538+
34933539
HashifyStreamPendings = lambda { |reply|
34943540
{
34953541
'size' => reply[0],

makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
REDIS_BRANCH ?= 6.0
1+
REDIS_BRANCH ?= 6.2
22
TMP := tmp
33
BUILD_DIR := ${TMP}/cache/redis-${REDIS_BRANCH}
44
TARBALL := ${TMP}/redis-${REDIS_BRANCH}.tar.gz

test/lint/streams.rb

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
module Lint
44
module Streams
55
MIN_REDIS_VERSION = '4.9.0'
6+
MIN_REDIS_VERSION_XAUTOCLAIM = '6.2.0'
67
ENTRY_ID_FORMAT = /\d+-\d+/.freeze
78

89
def setup
@@ -633,6 +634,72 @@ def test_xclaim_with_invalid_arguments
633634
assert_raises(Redis::CommandError) { redis.xclaim('', '', '', '', '') }
634635
end
635636

637+
def test_xautoclaim
638+
omit_version(MIN_REDIS_VERSION_XAUTOCLAIM)
639+
640+
redis.xadd('s1', { f: 'v1' }, id: '0-1')
641+
redis.xgroup(:create, 's1', 'g1', '$')
642+
redis.xadd('s1', { f: 'v2' }, id: '0-2')
643+
redis.xadd('s1', { f: 'v3' }, id: '0-3')
644+
redis.xreadgroup('g1', 'c1', 's1', '>')
645+
sleep 0.01
646+
647+
actual = redis.xautoclaim('s1', 'g1', 'c2', 10, '0-0')
648+
649+
assert_equal '0-0', actual['next']
650+
assert_equal %w(0-2 0-3), actual['entries'].map(&:first)
651+
assert_equal(%w(v2 v3), actual['entries'].map { |i| i.last['f'] })
652+
end
653+
654+
def test_xautoclaim_with_justid_option
655+
omit_version(MIN_REDIS_VERSION_XAUTOCLAIM)
656+
657+
redis.xadd('s1', { f: 'v1' }, id: '0-1')
658+
redis.xgroup(:create, 's1', 'g1', '$')
659+
redis.xadd('s1', { f: 'v2' }, id: '0-2')
660+
redis.xadd('s1', { f: 'v3' }, id: '0-3')
661+
redis.xreadgroup('g1', 'c1', 's1', '>')
662+
sleep 0.01
663+
664+
actual = redis.xautoclaim('s1', 'g1', 'c2', 10, '0-0', justid: true)
665+
666+
assert_equal '0-0', actual['next']
667+
assert_equal %w(0-2 0-3), actual['entries']
668+
end
669+
670+
def test_xautoclaim_with_count_option
671+
omit_version(MIN_REDIS_VERSION_XAUTOCLAIM)
672+
673+
redis.xadd('s1', { f: 'v1' }, id: '0-1')
674+
redis.xgroup(:create, 's1', 'g1', '$')
675+
redis.xadd('s1', { f: 'v2' }, id: '0-2')
676+
redis.xadd('s1', { f: 'v3' }, id: '0-3')
677+
redis.xreadgroup('g1', 'c1', 's1', '>')
678+
sleep 0.01
679+
680+
actual = redis.xautoclaim('s1', 'g1', 'c2', 10, '0-0', count: 1)
681+
682+
assert_equal '0-3', actual['next']
683+
assert_equal %w(0-2), actual['entries'].map(&:first)
684+
assert_equal(%w(v2), actual['entries'].map { |i| i.last['f'] })
685+
end
686+
687+
def test_xautoclaim_with_larger_interval
688+
omit_version(MIN_REDIS_VERSION_XAUTOCLAIM)
689+
690+
redis.xadd('s1', { f: 'v1' }, id: '0-1')
691+
redis.xgroup(:create, 's1', 'g1', '$')
692+
redis.xadd('s1', { f: 'v2' }, id: '0-2')
693+
redis.xadd('s1', { f: 'v3' }, id: '0-3')
694+
redis.xreadgroup('g1', 'c1', 's1', '>')
695+
sleep 0.01
696+
697+
actual = redis.xautoclaim('s1', 'g1', 'c2', 36_000, '0-0')
698+
699+
assert_equal '0-0', actual['next']
700+
assert_equal [], actual['entries']
701+
end
702+
636703
def test_xpending
637704
redis.xadd('s1', { f: 'v1' }, id: '0-1')
638705
redis.xgroup(:create, 's1', 'g1', '$')

0 commit comments

Comments
 (0)