Skip to content

Commit 60f5320

Browse files
committed
Stabilize the jruby test suite
Some threads would stay subscribed to various channel causing state leak across threads. It's still not perfect but should be much more stable.
1 parent 325da9d commit 60f5320

File tree

4 files changed

+71
-51
lines changed

4 files changed

+71
-51
lines changed

lib/redis/distributed.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ def quit
6666
on_each_node :quit
6767
end
6868

69+
def close
70+
on_each_node :close
71+
end
72+
6973
# Asynchronously save the dataset to disk.
7074
def bgsave
7175
on_each_node :bgsave

lib/redis/subscribe.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ def punsubscribe(*channels)
3434
call_v([:punsubscribe, *channels])
3535
end
3636

37+
def close
38+
@client.close
39+
end
40+
3741
protected
3842

3943
def subscription(start, stop, channels, block, timeout = 0)

test/helper.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def setup
116116
end
117117

118118
def teardown
119-
redis&.quit
119+
redis&.close
120120
super
121121
end
122122

test/redis/publish_subscribe_test.rb

Lines changed: 62 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,35 @@
55
class TestPublishSubscribe < Minitest::Test
66
include Helper::Client
77

8+
def setup
9+
@threads = {}
10+
super
11+
end
12+
13+
def teardown
14+
super
15+
@threads.each do |thread, redis|
16+
if redis.subscribed?
17+
redis.unsubscribe
18+
redis.punsubscribe
19+
end
20+
redis.close
21+
begin
22+
thread.join(2) or warn("leaked thread")
23+
rescue RedisClient::ConnectionError
24+
end
25+
end
26+
end
27+
828
class TestError < StandardError
929
end
1030

1131
def test_subscribe_and_unsubscribe
1232
@subscribed = false
1333
@unsubscribed = false
1434

15-
thread = Thread.new do
16-
r.subscribe("foo") do |on|
35+
thread = new_thread do |r|
36+
r.subscribe(channel_name) do |on|
1737
on.subscribe do |_channel, total|
1838
@subscribed = true
1939
@t1 = total
@@ -36,8 +56,7 @@ def test_subscribe_and_unsubscribe
3656
# Wait until the subscription is active before publishing
3757
Thread.pass until @subscribed
3858

39-
Redis.new(OPTIONS).publish("foo", "s1")
40-
59+
redis.publish(channel_name, "s1")
4160
thread.join
4261

4362
assert @subscribed
@@ -51,8 +70,8 @@ def test_psubscribe_and_punsubscribe
5170
@subscribed = false
5271
@unsubscribed = false
5372

54-
thread = Thread.new do
55-
r.psubscribe("f*") do |on|
73+
thread = new_thread do |r|
74+
r.psubscribe("channel:*") do |on|
5675
on.psubscribe do |_pattern, total|
5776
@subscribed = true
5877
@t1 = total
@@ -74,9 +93,7 @@ def test_psubscribe_and_punsubscribe
7493

7594
# Wait until the subscription is active before publishing
7695
Thread.pass until @subscribed
77-
78-
Redis.new(OPTIONS).publish("foo", "s1")
79-
96+
redis.publish(channel_name, "s1")
8097
thread.join
8198

8299
assert @subscribed
@@ -86,56 +103,37 @@ def test_psubscribe_and_punsubscribe
86103
assert_equal "s1", @message
87104
end
88105

89-
def test_pubsub_with_numpat_subcommand
90-
@subscribed = false
91-
thread = Thread.new do
92-
r.psubscribe("f*") do |on|
93-
on.psubscribe { |_channel, _total| @subscribed = true }
94-
on.pmessage { |_pattern, _channel, _message| r.punsubscribe }
95-
end
96-
end
97-
Thread.pass until @subscribed
98-
redis = Redis.new(OPTIONS)
99-
numpat_result = redis.pubsub(:numpat)
100-
101-
redis.publish("foo", "s1")
102-
thread.join
103-
104-
assert_equal redis.pubsub(:numpat), 0
105-
assert_equal numpat_result, 1
106-
end
107-
108106
def test_pubsub_with_channels_and_numsub_subcommnads
109107
@subscribed = false
110-
thread = Thread.new do
111-
r.subscribe("foo") do |on|
108+
thread = new_thread do |r|
109+
r.subscribe(channel_name) do |on|
112110
on.subscribe { |_channel, _total| @subscribed = true }
113111
on.message { |_channel, _message| r.unsubscribe }
114112
end
115113
end
116114
Thread.pass until @subscribed
117-
redis = Redis.new(OPTIONS)
118115
channels_result = redis.pubsub(:channels)
119116
channels_result.delete('__sentinel__:hello')
120-
numsub_result = redis.pubsub(:numsub, 'foo', 'boo')
117+
numsub_result = redis.pubsub(:numsub, channel_name, 'boo')
121118

122-
redis.publish("foo", "s1")
119+
redis.publish(channel_name, "s1")
123120
thread.join
124121

125-
assert_equal channels_result, ['foo']
126-
assert_equal numsub_result, ['foo', 1, 'boo', 0]
122+
assert_includes channels_result, channel_name
123+
assert_equal [channel_name, 1, 'boo', 0], numsub_result
127124
end
128125

129126
def test_subscribe_connection_usable_after_raise
130127
@subscribed = false
131128

132-
thread = Thread.new do
133-
r.subscribe("foo") do |on|
129+
thread = new_thread do |r|
130+
r.subscribe(channel_name) do |on|
134131
on.subscribe do |_channel, _total|
135132
@subscribed = true
136133
end
137134

138135
on.message do |_channel, _message|
136+
r.unsubscribe
139137
raise TestError
140138
end
141139
end
@@ -145,7 +143,7 @@ def test_subscribe_connection_usable_after_raise
145143
# Wait until the subscription is active before publishing
146144
Thread.pass until @subscribed
147145

148-
Redis.new(OPTIONS).publish("foo", "s1")
146+
redis.publish(channel_name, "s1")
149147

150148
thread.join
151149

@@ -155,8 +153,8 @@ def test_subscribe_connection_usable_after_raise
155153
def test_psubscribe_connection_usable_after_raise
156154
@subscribed = false
157155

158-
thread = Thread.new do
159-
r.psubscribe("f*") do |on|
156+
thread = new_thread do |r|
157+
r.psubscribe("channel:*") do |on|
160158
on.psubscribe do |_pattern, _total|
161159
@subscribed = true
162160
end
@@ -171,7 +169,7 @@ def test_psubscribe_connection_usable_after_raise
171169
# Wait until the subscription is active before publishing
172170
Thread.pass until @subscribed
173171

174-
Redis.new(OPTIONS).publish("foo", "s1")
172+
redis.publish(channel_name, "s1")
175173

176174
thread.join
177175

@@ -181,34 +179,34 @@ def test_psubscribe_connection_usable_after_raise
181179
def test_subscribe_within_subscribe
182180
@channels = []
183181

184-
thread = Thread.new do
185-
r.subscribe("foo") do |on|
182+
thread = new_thread do |r|
183+
r.subscribe(channel_name) do |on|
186184
on.subscribe do |channel, _total|
187185
@channels << channel
188186

189-
r.subscribe("bar") if channel == "foo"
187+
r.subscribe("bar") if channel == channel_name
190188
r.unsubscribe if channel == "bar"
191189
end
192190
end
193191
end
194192

195193
thread.join
196194

197-
assert_equal ["foo", "bar"], @channels
195+
assert_equal [channel_name, "bar"], @channels
198196
end
199197

200198
def test_other_commands_within_a_subscribe
201-
r.subscribe("foo") do |on|
199+
r.subscribe(channel_name) do |on|
202200
on.subscribe do |_channel, _total|
203201
r.set("bar", "s2")
204-
r.unsubscribe("foo")
202+
r.unsubscribe(channel_name)
205203
end
206204
end
207205
end
208206

209207
def test_subscribe_without_a_block
210208
assert_raises LocalJumpError do
211-
r.subscribe("foo")
209+
r.subscribe(channel_name)
212210
end
213211
end
214212

@@ -245,7 +243,7 @@ def test_subscribe_past_a_timeout
245243
def test_subscribe_with_timeout
246244
received = false
247245

248-
r.subscribe_with_timeout(LOW_TIMEOUT, "foo") do |on|
246+
r.subscribe_with_timeout(LOW_TIMEOUT, channel_name) do |on|
249247
on.message do |_channel, _message|
250248
received = true
251249
end
@@ -257,12 +255,26 @@ def test_subscribe_with_timeout
257255
def test_psubscribe_with_timeout
258256
received = false
259257

260-
r.psubscribe_with_timeout(LOW_TIMEOUT, "f*") do |on|
258+
r.psubscribe_with_timeout(LOW_TIMEOUT, "channel:*") do |on|
261259
on.message do |_channel, _message|
262260
received = true
263261
end
264262
end
265263

266264
refute received
267265
end
266+
267+
private
268+
269+
def new_thread(&block)
270+
redis = Redis.new(OPTIONS)
271+
thread = Thread.new(redis, &block)
272+
thread.report_on_exception = false
273+
@threads[thread] = redis
274+
thread
275+
end
276+
277+
def channel_name
278+
@channel_name ||= "channel:#{rand}"
279+
end
268280
end

0 commit comments

Comments
 (0)