Skip to content

Commit 0e812a0

Browse files
committed
Implement the call_v style interfaces
That's what redis-rb 5.0 uses.
1 parent 6e772a3 commit 0e812a0

File tree

7 files changed

+183
-127
lines changed

7 files changed

+183
-127
lines changed

lib/redis_client/cluster.rb

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,41 @@ class Cluster
1010

1111
def initialize(config, pool: nil, **kwargs)
1212
@router = ::RedisClient::Cluster::Router.new(config, pool: pool, **kwargs)
13+
@command_builder = config.command_builder
1314
end
1415

1516
def inspect
1617
"#<#{self.class.name} #{@router.node.node_keys.join(', ')}>"
1718
end
1819

19-
def call(*command, **kwargs, &block)
20-
@router.send_command(:call, *command, **kwargs, &block)
20+
def call(*args, **kwargs, &block)
21+
command = @command_builder.generate(args, kwargs)
22+
@router.send_command(:call_v, command, &block)
2123
end
2224

23-
def call_once(*command, **kwargs, &block)
24-
@router.send_command(:call_once, *command, **kwargs, &block)
25+
def call_v(command, &block)
26+
command = @command_builder.generate(command)
27+
@router.send_command(:call_v, command, &block)
2528
end
2629

27-
def blocking_call(timeout, *command, **kwargs, &block)
28-
@router.send_command(:blocking_call, timeout, *command, **kwargs, &block)
30+
def call_once(*args, **kwargs, &block)
31+
command = @command_builder.generate(args, kwargs)
32+
@router.send_command(:call_once_v, command, &block)
33+
end
34+
35+
def call_once_v(command, &block)
36+
command = @command_builder.generate(command)
37+
@router.send_command(:call_once_v, command, &block)
38+
end
39+
40+
def blocking_call(timeout, *args, **kwargs, &block)
41+
command = @command_builder.generate(args, kwargs)
42+
@router.send_command(:blocking_call_v, command, timeout, &block)
43+
end
44+
45+
def blocking_call_v(timeout, command, &block)
46+
command = @command_builder.generate(command)
47+
@router.send_command(:blocking_call_v, command, timeout, &block)
2948
end
3049

3150
def scan(*args, **kwargs, &block)
@@ -40,34 +59,34 @@ def scan(*args, **kwargs, &block)
4059
end
4160

4261
def sscan(key, *args, **kwargs, &block)
43-
node = @router.assign_node('SSCAN', key)
44-
@router.try_send(node, :sscan, key, *args, **kwargs, &block)
62+
node = @router.assign_node(['SSCAN', key])
63+
@router.try_delegate(node, :sscan, key, *args, **kwargs, &block)
4564
end
4665

4766
def hscan(key, *args, **kwargs, &block)
48-
node = @router.assign_node('HSCAN', key)
49-
@router.try_send(node, :hscan, key, *args, **kwargs, &block)
67+
node = @router.assign_node(['HSCAN', key])
68+
@router.try_delegate(node, :hscan, key, *args, **kwargs, &block)
5069
end
5170

5271
def zscan(key, *args, **kwargs, &block)
53-
node = @router.assign_node('ZSCAN', key)
54-
@router.try_send(node, :zscan, key, *args, **kwargs, &block)
72+
node = @router.assign_node(['ZSCAN', key])
73+
@router.try_delegate(node, :zscan, key, *args, **kwargs, &block)
5574
end
5675

5776
def pipelined
58-
pipeline = ::RedisClient::Cluster::Pipeline.new(@router)
77+
pipeline = ::RedisClient::Cluster::Pipeline.new(@router, @command_builder)
5978
yield pipeline
6079
return [] if pipeline.empty? == 0
6180

6281
pipeline.execute
6382
end
6483

6584
def pubsub
66-
::RedisClient::Cluster::PubSub.new(@router)
85+
::RedisClient::Cluster::PubSub.new(@router, @command_builder)
6786
end
6887

6988
def close
70-
@router.node.call_all(:close)
89+
@router.node.each(&:close)
7190
nil
7291
end
7392

@@ -76,7 +95,8 @@ def close
7695
def method_missing(name, *args, **kwargs, &block)
7796
if @router.command_exists?(name)
7897
args.unshift(name)
79-
return @router.send_command(:call, *args, **kwargs, &block)
98+
command = @command_builder.generate(args, kwargs)
99+
return @router.send_command(:call_v, command, &block)
80100
end
81101

82102
super

lib/redis_client/cluster/node.rb

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -128,46 +128,46 @@ def find_by(node_key)
128128
@clients.fetch(node_key)
129129
end
130130

131-
def call_all(method, *args, **kwargs, &block)
131+
def call_all(method, command, args, &block)
132132
results, errors = try_map do |_, client|
133-
client.send(method, *args, **kwargs, &block)
133+
client.send(method, *args, command, &block)
134134
end
135135

136136
return results.values if errors.empty?
137137

138138
raise ::RedisClient::Cluster::ErrorCollection, errors
139139
end
140140

141-
def call_primaries(method, *args, **kwargs, &block)
141+
def call_primaries(method, command, args, &block)
142142
results, errors = try_map do |node_key, client|
143143
next if replica?(node_key)
144144

145-
client.send(method, *args, **kwargs, &block)
145+
client.send(method, *args, command, &block)
146146
end
147147

148148
return results.values if errors.empty?
149149

150150
raise ::RedisClient::Cluster::ErrorCollection, errors
151151
end
152152

153-
def call_replicas(method, *args, **kwargs, &block)
154-
return call_primaries(method, *args, **kwargs, &block) if replica_disabled?
153+
def call_replicas(method, command, args, &block)
154+
return call_primaries(method, command, args, &block) if replica_disabled?
155155

156156
replica_node_keys = @replications.values.map(&:sample)
157157
results, errors = try_map do |node_key, client|
158158
next if primary?(node_key) || !replica_node_keys.include?(node_key)
159159

160-
client.send(method, *args, **kwargs, &block)
160+
client.send(method, *args, command, &block)
161161
end
162162

163163
return results.values if errors.empty?
164164

165165
raise ::RedisClient::Cluster::ErrorCollection, errors
166166
end
167167

168-
def send_ping(method, *args, **kwargs, &block)
168+
def send_ping(method, command, args, &block)
169169
results, errors = try_map do |_, client|
170-
client.send(method, *args, **kwargs, &block)
170+
client.send(method, *args, command, &block)
171171
end
172172

173173
return results.values if errors.empty?

lib/redis_client/cluster/pipeline.rb

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,52 @@ class Cluster
88
class Pipeline
99
ReplySizeError = Class.new(::RedisClient::Error)
1010

11-
def initialize(router)
11+
def initialize(router, command_builder)
1212
@router = router
13+
@command_builder = command_builder
1314
@grouped = Hash.new([].freeze)
1415
@size = 0
1516
end
1617

17-
def call(*command, **kwargs)
18-
node_key = @router.find_node_key(*command, primary_only: true, **kwargs)
19-
@grouped[node_key] += [[@size, :call, command, kwargs]]
18+
def call(*args, **kwargs)
19+
command = @command_builder.generate(args, kwargs)
20+
node_key = @router.find_node_key(command, primary_only: true)
21+
@grouped[node_key] += [[@size, :call_v, command]]
2022
@size += 1
2123
end
2224

23-
def call_once(*command, **kwargs)
24-
node_key = @router.find_node_key(*command, primary_only: true, **kwargs)
25-
@grouped[node_key] += [[@size, :call_once, command, kwargs]]
25+
def call_v(args)
26+
command = @command_builder.generate(args)
27+
node_key = @router.find_node_key(command, primary_only: true)
28+
@grouped[node_key] += [[@size, :call_v, command]]
2629
@size += 1
2730
end
2831

29-
def blocking_call(timeout, *command, **kwargs)
30-
node_key = @router.find_node_key(*command, primary_only: true, **kwargs)
31-
@grouped[node_key] += [[@size, :blocking_call, timeout, command, kwargs]]
32+
def call_once(*args, **kwargs)
33+
command = @command_builder.generate(args, kwargs)
34+
node_key = @router.find_node_key(command, primary_only: true)
35+
@grouped[node_key] += [[@size, :call_once_v, command]]
36+
@size += 1
37+
end
38+
39+
def call_once_v(args)
40+
command = @command_builder.generate(args)
41+
node_key = @router.find_node_key(command, primary_only: true)
42+
@grouped[node_key] += [[@size, :call_once_v, command]]
43+
@size += 1
44+
end
45+
46+
def blocking_call(timeout, *args, **kwargs)
47+
command = @command_builder.generate(args, kwargs)
48+
node_key = @router.find_node_key(command, primary_only: true)
49+
@grouped[node_key] += [[@size, :blocking_call_v, timeout, command]]
50+
@size += 1
51+
end
52+
53+
def blocking_call_v(timeout, args)
54+
command = @command_builder.generate(args)
55+
node_key = @router.find_node_key(command, primary_only: true)
56+
@grouped[node_key] += [[@size, :blocking_call_v, timeout, command]]
3257
@size += 1
3358
end
3459

@@ -37,20 +62,15 @@ def empty?
3762
end
3863

3964
# TODO: https://github.com/redis-rb/redis-cluster-client/issues/37 handle redirections
40-
def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
65+
def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength
4166
all_replies = Array.new(@size)
4267
errors = {}
4368
threads = @grouped.map do |k, v|
4469
Thread.new(@router, k, v) do |router, node_key, rows|
4570
Thread.pass
4671
replies = router.find_node(node_key).pipelined do |pipeline|
47-
rows.each do |row|
48-
case row[1]
49-
when :call then pipeline.call(*row[2], **row[3])
50-
when :call_once then pipeline.call_once(*row[2], **row[3])
51-
when :blocking_call then pipeline.blocking_call(row[2], *row[3], **row[4])
52-
else raise NotImplementedError, row[1]
53-
end
72+
rows.each do |(_size, *row)|
73+
pipeline.send(*row)
5474
end
5575
end
5676

lib/redis_client/cluster/pub_sub.rb

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,24 @@
33
class RedisClient
44
class Cluster
55
class PubSub
6-
def initialize(router)
6+
def initialize(router, command_builder)
77
@router = router
8+
@command_builder = command_builder
89
@pubsub = nil
910
end
1011

11-
def call(*command, **kwargs)
12+
def call(*args, **kwargs)
1213
close
13-
@pubsub = @router.assign_node(*command, **kwargs).pubsub
14-
@pubsub.call(*command, **kwargs)
14+
command = @command_builder.generate(args, kwargs)
15+
@pubsub = @router.assign_node(command).pubsub
16+
@pubsub.call_v(command)
17+
end
18+
19+
def call_v(command)
20+
close
21+
command = @command_builder.generate(command)
22+
@pubsub = @router.assign_node(command).pubsub
23+
@pubsub.call_v(command)
1524
end
1625

1726
def close

0 commit comments

Comments
 (0)