Skip to content

Commit 52bdfb7

Browse files
authored
fix: use fiber local storage instead of thread local storage (#185)
1 parent ee1c082 commit 52bdfb7

File tree

5 files changed

+38
-38
lines changed

5 files changed

+38
-38
lines changed

lib/redis_client/cluster/node.rb

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -85,24 +85,24 @@ def load_info(options, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/Cycl
8585
startup_nodes.each_slice(MAX_THREADS).with_index do |chuncked_startup_nodes, chuncked_idx|
8686
threads = chuncked_startup_nodes.each_with_index.map do |raw_client, idx|
8787
Thread.new(raw_client, (MAX_THREADS * chuncked_idx) + idx) do |cli, i|
88-
Thread.current.thread_variable_set(:index, i)
88+
Thread.current[:index] = i
8989
reply = cli.call('CLUSTER', 'NODES')
90-
Thread.current.thread_variable_set(:info, parse_cluster_node_reply(reply))
90+
Thread.current[:info] = parse_cluster_node_reply(reply)
9191
rescue StandardError => e
92-
Thread.current.thread_variable_set(:error, e)
92+
Thread.current[:error] = e
9393
ensure
9494
cli&.close
9595
end
9696
end
9797

9898
threads.each do |t|
9999
t.join
100-
if t.thread_variable?(:info)
100+
if t.key?(:info)
101101
node_info_list ||= Array.new(startup_size)
102-
node_info_list[t.thread_variable_get(:index)] = t.thread_variable_get(:info)
103-
elsif t.thread_variable?(:error)
102+
node_info_list[t[:index]] = t[:info]
103+
elsif t.key?(:error)
104104
errors ||= Array.new(startup_size)
105-
errors[t.thread_variable_get(:index)] = t.thread_variable_get(:error)
105+
errors[t[:index]] = t[:error]
106106
end
107107
end
108108
end
@@ -303,22 +303,22 @@ def try_map(clients) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComple
303303
clients.each_slice(MAX_THREADS) do |chuncked_clients|
304304
threads = chuncked_clients.map do |k, v|
305305
Thread.new(k, v) do |node_key, client|
306-
Thread.current.thread_variable_set(:node_key, node_key)
306+
Thread.current[:node_key] = node_key
307307
reply = yield(node_key, client)
308-
Thread.current.thread_variable_set(:result, reply)
308+
Thread.current[:result] = reply
309309
rescue StandardError => e
310-
Thread.current.thread_variable_set(:error, e)
310+
Thread.current[:error] = e
311311
end
312312
end
313313

314314
threads.each do |t|
315315
t.join
316-
if t.thread_variable?(:result)
316+
if t.key?(:result)
317317
results ||= {}
318-
results[t.thread_variable_get(:node_key)] = t.thread_variable_get(:result)
319-
elsif t.thread_variable?(:error)
318+
results[t[:node_key]] = t[:result]
319+
elsif t.key?(:error)
320320
errors ||= {}
321-
errors[t.thread_variable_get(:node_key)] = t.thread_variable_get(:error)
321+
errors[t[:node_key]] = t[:error]
322322
end
323323
end
324324
end

lib/redis_client/cluster/node/latency_replica.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def measure_latencies(clients) # rubocop:disable Metrics/AbcSize
4343
clients.each_slice(::RedisClient::Cluster::Node::MAX_THREADS).each_with_object({}) do |chuncked_clients, acc|
4444
threads = chuncked_clients.map do |k, v|
4545
Thread.new(k, v) do |node_key, client|
46-
Thread.current.thread_variable_set(:node_key, node_key)
46+
Thread.current[:node_key] = node_key
4747

4848
min = DUMMY_LATENCY_NSEC
4949
MEASURE_ATTEMPT_COUNT.times do
@@ -53,15 +53,15 @@ def measure_latencies(clients) # rubocop:disable Metrics/AbcSize
5353
min = duration if duration < min
5454
end
5555

56-
Thread.current.thread_variable_set(:latency, min)
56+
Thread.current[:latency] = min
5757
rescue StandardError
58-
Thread.current.thread_variable_set(:latency, DUMMY_LATENCY_NSEC)
58+
Thread.current[:latency] = DUMMY_LATENCY_NSEC
5959
end
6060
end
6161

6262
threads.each do |t|
6363
t.join
64-
acc[t.thread_variable_get(:node_key)] = t.thread_variable_get(:latency)
64+
acc[t[:node_key]] = t[:latency]
6565
end
6666
end
6767
end

lib/redis_client/cluster/pipeline.rb

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -150,34 +150,34 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met
150150
@pipelines&.each_slice(MAX_THREADS) do |chuncked_pipelines|
151151
threads = chuncked_pipelines.map do |node_key, pipeline|
152152
Thread.new(node_key, pipeline) do |nk, pl|
153-
Thread.current.thread_variable_set(:node_key, nk)
153+
Thread.current[:node_key] = nk
154154
replies = do_pipelining(@router.find_node(nk), pl)
155155
raise ReplySizeError, "commands: #{pl._size}, replies: #{replies.size}" if pl._size != replies.size
156156

157-
Thread.current.thread_variable_set(:replies, replies)
157+
Thread.current[:replies] = replies
158158
rescue ::RedisClient::Cluster::Pipeline::RedirectionNeeded => e
159-
Thread.current.thread_variable_set(:redirection_needed, e)
159+
Thread.current[:redirection_needed] = e
160160
rescue StandardError => e
161-
Thread.current.thread_variable_set(:error, e)
161+
Thread.current[:error] = e
162162
end
163163
end
164164

165165
threads.each(&:join)
166166
threads.each do |t|
167-
if t.thread_variable?(:replies)
167+
if t.key?(:replies)
168168
all_replies ||= Array.new(@size)
169-
@pipelines[t.thread_variable_get(:node_key)]
169+
@pipelines[t[:node_key]]
170170
.outer_indices
171-
.each_with_index { |outer, inner| all_replies[outer] = t.thread_variable_get(:replies)[inner] }
172-
elsif t.thread_variable?(:redirection_needed)
171+
.each_with_index { |outer, inner| all_replies[outer] = t[:replies][inner] }
172+
elsif t.key?(:redirection_needed)
173173
all_replies ||= Array.new(@size)
174-
pipeline = @pipelines[t.thread_variable_get(:node_key)]
175-
err = t.thread_variable_get(:redirection_needed)
174+
pipeline = @pipelines[t[:node_key]]
175+
err = t[:redirection_needed]
176176
err.indices.each { |i| err.replies[i] = handle_redirection(err.replies[i], pipeline, i) }
177177
pipeline.outer_indices.each_with_index { |outer, inner| all_replies[outer] = err.replies[inner] }
178-
elsif t.thread_variable?(:error)
178+
elsif t.key?(:error)
179179
errors ||= {}
180-
errors[t.thread_variable_get(:node_key)] = t.thread_variable_get(:error)
180+
errors[t[:node_key]] = t[:error]
181181
end
182182
end
183183
end

test/redis_client/cluster/test_normalized_cmd_name.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,21 +67,21 @@ def test_thread_safety
6767

6868
threads = attempts.each_with_index.map do |_, i|
6969
Thread.new do
70-
Thread.current.thread_variable_set(:index, i)
70+
Thread.current[:index] = i
7171
got = if i.even?
7272
@subject.get_by_command(%w[SET foo bar])
7373
else
7474
@subject.clear ? 'set' : 'clear failed'
7575
end
76-
Thread.current.thread_variable_set(:got, got)
76+
Thread.current[:got] = got
7777
rescue StandardError => e
78-
Thread.current.thread_variable_set(:got, "#{e.class.name}: #{e.message}")
78+
Thread.current[:got] = "#{e.class.name}: #{e.message}"
7979
end
8080
end
8181

8282
threads.each do |t|
8383
t.join
84-
attempts[t.thread_variable_get(:index)] = t.thread_variable_get(:got)
84+
attempts[t[:index]] = t[:got]
8585
end
8686

8787
attempts.each { |got| assert_equal('set', got) }

test/test_concurrency.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ def test_threading
5656
ATTEMPTS.times { MAX_THREADS.times { |i| @client.call('INCR', "key#{i}") } }
5757
ATTEMPTS.times { MAX_THREADS.times { |i| @client.call('DECR', "key#{i}") } }
5858
rescue StandardError => e
59-
Thread.current.thread_variable_set(:error, e)
59+
Thread.current[:error] = e
6060
end
6161
end
6262

6363
threads.each(&:join)
64-
threads.each { |t| assert_nil(t.thread_variable_get(:error)) }
64+
threads.each { |t| assert_nil(t[:error]) }
6565
MAX_THREADS.times { |i| assert_equal(WANT, @client.call('GET', "key#{i}")) }
6666
end
6767

@@ -71,12 +71,12 @@ def test_threading_with_pipelining
7171
@client.pipelined { |pi| ATTEMPTS.times { MAX_THREADS.times { |i| pi.call('INCR', "key#{i}") } } }
7272
@client.pipelined { |pi| ATTEMPTS.times { MAX_THREADS.times { |i| pi.call('DECR', "key#{i}") } } }
7373
rescue StandardError => e
74-
Thread.current.thread_variable_set(:error, e)
74+
Thread.current[:error] = e
7575
end
7676
end
7777

7878
threads.each(&:join)
79-
threads.each { |t| assert_nil(t.thread_variable_get(:error)) }
79+
threads.each { |t| assert_nil(t[:error]) }
8080
MAX_THREADS.times { |i| assert_equal(WANT, @client.call('GET', "key#{i}")) }
8181
end
8282

0 commit comments

Comments
 (0)