Skip to content

Commit 9b0fbaa

Browse files
authored
Add pubsub feature (#25)
1 parent 376e063 commit 9b0fbaa

File tree

3 files changed

+45
-7
lines changed

3 files changed

+45
-7
lines changed

lib/redis_client/cluster.rb

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,28 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met
6565
end
6666
end
6767

68+
class PubSub
69+
def initialize(client)
70+
@client = client
71+
@pubsub = nil
72+
end
73+
74+
def call(*command, **kwargs)
75+
close
76+
@pubsub = @client.send(:assign_node, *command).pubsub
77+
@pubsub.call(*command, **kwargs)
78+
end
79+
80+
def close
81+
@pubsub&.close
82+
@pubsub = nil
83+
end
84+
85+
def next_event(timeout = nil)
86+
@pubsub&.next_event(timeout)
87+
end
88+
end
89+
6890
ZERO_CURSOR_FOR_SCAN = '0'
6991
CMD_SCAN = 'SCAN'
7092
CMD_SSCAN = 'SSCAN'
@@ -135,7 +157,7 @@ def pipelined
135157
end
136158

137159
def pubsub
138-
# TODO: impl
160+
::RedisClient::Cluster::PubSub.new(self)
139161
end
140162

141163
def close

lib/redis_client/cluster/node.rb

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,6 @@ def call_replica(method, *command, **kwargs, &block)
129129
end.values
130130
end
131131

132-
# TODO: impl
133-
def process_all(commands, &block)
134-
try_map { |_, client| client.process(commands, &block) }.values
135-
end
136-
137132
def scale_reading_clients
138133
clients = @clients.select do |node_key, _|
139134
replica_disabled? ? primary?(node_key) : replica?(node_key)

test/redis_client/test_cluster.rb

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
class RedisClient
88
class TestCluster
9-
module Mixin
9+
module Mixin # rubocop:disable Metrics/ModuleLength
1010
include TestingHelper
1111

1212
def setup
@@ -116,6 +116,27 @@ def test_pipelined
116116
assert_equal(want, got)
117117
end
118118

119+
def test_pubsub
120+
(0..9).each do |i|
121+
pubsub = @client.pubsub
122+
pubsub.call('SUBSCRIBE', "channel#{i}")
123+
assert_equal(['subscribe', "channel#{i}", 1], pubsub.next_event(0.1))
124+
end
125+
126+
sub = Fiber.new do |client|
127+
channel = 'my-channel'
128+
pubsub = client.pubsub
129+
pubsub.call('SUBSCRIBE', channel)
130+
assert_equal(['subscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC))
131+
Fiber.yield(channel)
132+
Fiber.yield(pubsub.next_event(TEST_TIMEOUT_SEC))
133+
end
134+
135+
channel = sub.resume(@client)
136+
@client.call('PUBLISH', channel, 'hello world')
137+
assert_equal(['message', channel, 'hello world'], sub.resume)
138+
end
139+
119140
def test_close
120141
assert_nil(@client.close)
121142
end

0 commit comments

Comments
 (0)