Skip to content

Commit efdba1a

Browse files
xgroup_createconsumer (#1553)
1 parent 41e3f56 commit efdba1a

File tree

2 files changed

+28
-0
lines changed

2 files changed

+28
-0
lines changed

redis/commands.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1870,6 +1870,18 @@ def xgroup_destroy(self, name, groupname):
18701870
"""
18711871
return self.execute_command('XGROUP DESTROY', name, groupname)
18721872

1873+
def xgroup_createconsumer(self, name, groupname, consumername):
1874+
"""
1875+
Consumers in a consumer group are auto-created every time a new
1876+
consumer name is mentioned by some command.
1877+
They can be explicitly created by using this command.
1878+
name: name of the stream.
1879+
groupname: name of the consumer group.
1880+
consumername: name of consumer to create.
1881+
"""
1882+
return self.execute_command('XGROUP CREATECONSUMER', name, groupname,
1883+
consumername)
1884+
18731885
def xgroup_setid(self, name, groupname, id):
18741886
"""
18751887
Set the consumer group last delivered ID to something else.

tests/test_commands.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2731,6 +2731,22 @@ def test_xgroup_delconsumer(self, r):
27312731
# deleting the consumer should return 2 pending messages
27322732
assert r.xgroup_delconsumer(stream, group, consumer) == 2
27332733

2734+
@skip_if_server_version_lt('6.2.0')
2735+
def test_xgroup_createconsumer(self, r):
2736+
stream = 'stream'
2737+
group = 'group'
2738+
consumer = 'consumer'
2739+
r.xadd(stream, {'foo': 'bar'})
2740+
r.xadd(stream, {'foo': 'bar'})
2741+
r.xgroup_create(stream, group, 0)
2742+
assert r.xgroup_createconsumer(stream, group, consumer) == 1
2743+
2744+
# read all messages from the group
2745+
r.xreadgroup(group, consumer, streams={stream: '>'})
2746+
2747+
# deleting the consumer should return 2 pending messages
2748+
assert r.xgroup_delconsumer(stream, group, consumer) == 2
2749+
27342750
@skip_if_server_version_lt('5.0.0')
27352751
def test_xgroup_destroy(self, r):
27362752
stream = 'stream'

0 commit comments

Comments
 (0)