Skip to content

Commit 863cb51

Browse files
committed
SSSSSSSSSSSSSSSSSSSSSSSSSSSSMOKIN' FAST new implementation of thread locals
The previous implementation of thread-local variables would acquire a lock EVERY time you read OR wrote a TLV. This completely eliminates the scalability advantage of using thread-locals. This implementation uses one array per thread to hold the values of thread-locals. Obviously, the more thread-locals you create, the bigger the arrays will be. Since each array is accessed by one thread only, no locking is required to read/write a TLV.
1 parent faa76ae commit 863cb51

File tree

5 files changed

+217
-279
lines changed

5 files changed

+217
-279
lines changed

examples/thread_local_memory_usage.rb

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#!/usr/bin/env ruby
2+
3+
$: << File.expand_path('../../lib', __FILE__)
4+
5+
$DEBUG_TLV = true
6+
require 'concurrent'
7+
require 'concurrent/atomic/thread_local_var'
8+
require 'benchmark'
9+
require 'thread'
10+
11+
include Concurrent
12+
13+
# if we hold on to vars, but threads die, space used for TLVs should be recovered
14+
15+
def test_thread_gc(vars)
16+
threads = 500.times.collect do
17+
Thread.new do
18+
vars.each do |var|
19+
var.value = 1
20+
end
21+
end
22+
end
23+
threads.each(&:join)
24+
end
25+
26+
puts "BEFORE THREAD GC TEST:"
27+
puts "Ruby heap pages: #{GC.stat[:heap_length]}, Other malloc'd bytes: #{GC.stat[:malloc_increase]}"
28+
29+
vars = 500.times.collect { ThreadLocalVar.new(0) }
30+
200.times do
31+
test_thread_gc(vars)
32+
GC.start
33+
end
34+
35+
puts "AFTER THREAD GC TEST:"
36+
puts "Ruby heap pages: #{GC.stat[:heap_length]}, Other malloc'd bytes: #{GC.stat[:malloc_increase]}"
37+
38+
# if we hold on to threads, but drop TLVs, space used should be reused by allocated TLVs
39+
40+
def tlv_gc_test_loop(queue)
41+
while true
42+
var = queue.pop
43+
return if var.nil?
44+
var.value = 1
45+
end
46+
end
47+
48+
def test_tlv_gc(queues)
49+
500.times do
50+
var = ThreadLocalVar.new(0)
51+
queues.each { |q| q << var }
52+
end
53+
end
54+
55+
puts
56+
puts "BEFORE TLV GC TEST:"
57+
puts "Ruby heap pages: #{GC.stat[:heap_length]}, Other malloc'd bytes: #{GC.stat[:malloc_increase]}"
58+
59+
queues = 500.times.collect { Queue.new }
60+
threads = queues.map do |queue|
61+
Thread.new do
62+
tlv_gc_test_loop(queue)
63+
end
64+
end
65+
66+
200.times do
67+
test_tlv_gc(queues)
68+
GC.start
69+
end
70+
queues.each { |q| q << nil }
71+
threads.each(&:join)
72+
73+
puts "AFTER TLV GC TEST:"
74+
puts "Ruby heap pages: #{GC.stat[:heap_length]}, Other malloc'd bytes: #{GC.stat[:malloc_increase]}"

examples/thread_local_var_bench.rb

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#!/usr/bin/env ruby
2+
3+
$: << File.expand_path('../../lib', __FILE__)
4+
5+
require 'concurrent'
6+
require 'concurrent/atomic/thread_local_var'
7+
require 'benchmark'
8+
9+
include Concurrent
10+
11+
N_THREADS = 100
12+
N_VARS = 100
13+
14+
vars = N_VARS.times.collect { ThreadLocalVar.new(0) }
15+
16+
def test_threadlocal_perf(vars)
17+
threads = N_THREADS.times.collect do
18+
Thread.new do
19+
10000.times do
20+
index = rand(N_VARS)
21+
var = vars[index]
22+
var.value = var.value + 1
23+
end
24+
end
25+
end
26+
threads.each(&:join)
27+
end
28+
29+
Benchmark.bmbm do |bm|
30+
bm.report('ThreadLocalVar') { test_threadlocal_perf(vars) }
31+
end

lib/concurrent/atomic/thread_local_var.rb

Lines changed: 112 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
require 'concurrent/atomic/thread_local_var/weak_key_map'
1+
require 'thread'
22

33
module Concurrent
44

@@ -70,7 +70,7 @@ def value
7070
# @!macro [attach] thread_local_var_method_set
7171
#
7272
# Sets the current thread's copy of this thread-local variable to the specified value.
73-
#
73+
#
7474
# @param [Object] value the value to set
7575
# @return [Object] the new value
7676
def value=(value)
@@ -81,7 +81,7 @@ def value=(value)
8181
#
8282
# Bind the given value to thread local storage during
8383
# execution of the given block.
84-
#
84+
#
8585
# @param [Object] value the value to bind
8686
# @yield the operation to be performed with the bound variable
8787
# @return [Object] the value
@@ -119,29 +119,129 @@ def set(value)
119119
# @!macro internal_implementation_note
120120
class RubyThreadLocalVar < AbstractThreadLocalVar
121121

122+
# Each thread has a (lazily initialized) array of thread-local variable values
123+
# Each time a new thread-local var is created, we allocate an "index" for it
124+
# For example, if the allocated index is 1, that means slot #1 in EVERY
125+
# thread's thread-local array will be used for the value of that TLV
126+
#
127+
# The good thing about using a per-THREAD structure to hold values, rather
128+
# than a per-TLV structure, is that no synchronization is needed when
129+
# reading and writing those values (since the structure is only ever
130+
# accessed by a single thread)
131+
#
132+
# Of course, when a TLV is GC'd, 1) we need to recover its index for use
133+
# by other new TLVs (otherwise the thread-local arrays could get bigger
134+
# and bigger with time), and 2) we need to null out all the references
135+
# held in the now-unused slots (both to avoid blocking GC of those objects,
136+
# and also to prevent "stale" values from being passed on to a new TLV
137+
# when the index is reused)
138+
# Because we need to null out freed slots, we need to keep references to
139+
# ALL the thread-local arrays -- ARRAYS is for that
140+
# But when a Thread is GC'd, we need to drop the reference to its thread-local
141+
# array, so we don't leak memory
142+
143+
FREE = []
144+
LOCK = Mutex.new
145+
ARRAYS = {} # used as a hash set
146+
@@next = 0
147+
122148
protected
123149

124150
# @!visibility private
125-
def allocate_storage
126-
@storage = WeakKeyMap.new
151+
def self.threadlocal_finalizer(index)
152+
proc do
153+
LOCK.synchronize do
154+
FREE.push(index)
155+
# The cost of GC'ing a TLV is linear in the number of threads using TLVs
156+
# But that is natural! More threads means more storage is used per TLV
157+
# So naturally more CPU time is required to free more storage
158+
ARRAYS.each_value do |array|
159+
array[index] = nil
160+
end
161+
end
162+
end
127163
end
128164

129165
# @!visibility private
130-
def get
131-
@storage[Thread.current]
166+
def self.thread_finalizer(array)
167+
proc do
168+
LOCK.synchronize do
169+
# The thread which used this thread-local array is now gone
170+
# So don't hold onto a reference to the array (thus blocking GC)
171+
ARRAYS.delete(array.object_id)
172+
end
173+
end
132174
end
133175

134-
# @!visibility private
135-
def set(value)
136-
key = Thread.current
176+
def allocate_storage
177+
@index = LOCK.synchronize do
178+
FREE.pop || begin
179+
result = @@next
180+
@@next += 1
181+
result
182+
end
183+
end
184+
ObjectSpace.define_finalizer(self, self.class.threadlocal_finalizer(@index))
185+
end
137186

138-
@storage[key] = value
187+
public
139188

189+
# @!macro [attach] thread_local_var_method_get
190+
#
191+
# Returns the value in the current thread's copy of this thread-local variable.
192+
#
193+
# @return [Object] the current value
194+
def value
195+
if array = Thread.current[:__threadlocal_array__]
196+
value = array[@index]
197+
if value.nil?
198+
@default
199+
elsif value.equal?(NIL_SENTINEL)
200+
nil
201+
else
202+
value
203+
end
204+
else
205+
@default
206+
end
207+
end
208+
209+
# @!macro [attach] thread_local_var_method_set
210+
#
211+
# Sets the current thread's copy of this thread-local variable to the specified value.
212+
#
213+
# @param [Object] value the value to set
214+
# @return [Object] the new value
215+
def value=(value)
216+
me = Thread.current
217+
# We could keep the thread-local arrays in a hash, keyed by Thread
218+
# But why? That would require locking
219+
# Using Ruby's built-in thread-local storage is faster
220+
unless array = me[:__threadlocal_array__]
221+
array = me[:__threadlocal_array__] = []
222+
LOCK.synchronize { ARRAYS[array.object_id] = array }
223+
ObjectSpace.define_finalizer(me, self.class.thread_finalizer(array))
224+
end
225+
array[@index] = (value.nil? ? NIL_SENTINEL : value)
226+
value
227+
end
228+
229+
# @!macro [attach] thread_local_var_method_bind
230+
#
231+
# Bind the given value to thread local storage during
232+
# execution of the given block.
233+
#
234+
# @param [Object] value the value to bind
235+
# @yield the operation to be performed with the bound variable
236+
# @return [Object] the value
237+
def bind(value, &block)
140238
if block_given?
239+
old_value = self.value
141240
begin
241+
self.value = value
142242
yield
143243
ensure
144-
@storage.delete(key)
244+
self.value = old_value
145245
end
146246
end
147247
end

0 commit comments

Comments
 (0)