Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions lib/redis_client/cluster/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,28 @@ def load(nodes, slow_command_timeout: -1) # rubocop:disable Metrics/AbcSize

private

def parse_command_reply(rows) # rubocop:disable Metrics/CyclomaticComplexity
def parse_command_reply(rows) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
rows&.each_with_object({}) do |row, acc|
next if row.first.nil?

# TODO: in redis 7.0 or later, subcommand information included in the command reply

pos = case row.first
when 'eval', 'evalsha', 'zinterstore', 'zunionstore' then 3
when 'object' then 2
when 'object', 'xgroup' then 2
when 'migrate', 'xread', 'xreadgroup' then 0
else row[3]
end

writable = case row.first
when 'xgroup' then true
else row[2].include?('write')
end

acc[row.first] = ::RedisClient::Cluster::Command::Detail.new(
first_key_position: pos,
key_step: row[5],
write?: row[2].include?('write'),
write?: writable,
readonly?: row[2].include?('readonly')
)
end.freeze || EMPTY_HASH
Expand Down Expand Up @@ -115,8 +122,11 @@ def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticCo
end

def determine_optional_key_position(command, option_name)
idx = command.map { |e| e.to_s.downcase(:ascii) }.index(option_name)
idx.nil? ? 0 : idx + 1
command.each_with_index do |e, i|
return i + 1 if e.to_s.downcase(:ascii) == option_name
end

0
end
end
end
Expand Down
41 changes: 41 additions & 0 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,47 @@ def test_other_pubsub_commands
ps.close
end

def test_stream_commands
@client.call('xadd', '{stream}1', '*', 'mesage', 'foo')
@client.call('xadd', '{stream}1', '*', 'mesage', 'bar')
@client.call('xadd', '{stream}2', '*', 'mesage', 'baz')
@client.call('xadd', '{stream}2', '*', 'mesage', 'zap')
wait_for_replication

consumer = new_test_client
got = consumer.call('xread', 'streams', '{stream}1', '{stream}2', '0', '0')
consumer.close

got = got.to_h if TEST_REDIS_MAJOR_VERSION < 6

assert_equal('foo', got.fetch('{stream}1')[0][1][1])
assert_equal('bar', got.fetch('{stream}1')[1][1][1])
assert_equal('baz', got.fetch('{stream}2')[0][1][1])
assert_equal('zap', got.fetch('{stream}2')[1][1][1])
end

def test_stream_group_commands
@client.call('xadd', '{stream}1', '*', 'task', 'data1')
@client.call('xadd', '{stream}1', '*', 'task', 'data2')
@client.call('xgroup', 'create', '{stream}1', 'worker', '0')
wait_for_replication

consumer1 = new_test_client
consumer2 = new_test_client
got1 = consumer1.call('xreadgroup', 'group', 'worker', 'consumer1', 'count', '1', 'streams', '{stream}1', '>')
got2 = consumer2.call('xreadgroup', 'group', 'worker', 'consumer2', 'count', '1', 'streams', '{stream}1', '>')
consumer1.close
consumer2.close

if TEST_REDIS_MAJOR_VERSION < 6
got1 = got1.to_h
got2 = got2.to_h
end

assert_equal('data1', got1.fetch('{stream}1')[0][1][1])
assert_equal('data2', got2.fetch('{stream}1')[0][1][1])
end

def test_with_method
assert_raises(NotImplementedError) { @client.with }
end
Expand Down
Loading