Skip to content

Commit 970dd0f

Browse files
ioquatixeregon
authored andcommitted
Revert back to shared implementation of "locals" storage array.
1 parent 3046539 commit 970dd0f

File tree

3 files changed

+266
-21
lines changed

3 files changed

+266
-21
lines changed

lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,40 @@
11
require 'concurrent/constants'
2+
require_relative 'locals'
23

34
module Concurrent
45
class FiberLocalVar
6+
LOCALS = FiberLocals.new(:concurrent_fiber_local_var)
7+
8+
# @!macro fiber_local_var_method_initialize
9+
#
10+
# Creates a fiber local variable.
11+
#
12+
# @param [Object] default the default value when otherwise unset
13+
# @param [Proc] default_block Optional block that gets called to obtain the
14+
# default value for each fiber
15+
16+
# @!macro fiber_local_var_method_get
17+
#
18+
# Returns the value in the current fiber's copy of this fiber-local variable.
19+
#
20+
# @return [Object] the current value
21+
22+
# @!macro fiber_local_var_method_set
23+
#
24+
# Sets the current fiber's copy of this fiber-local variable to the specified value.
25+
#
26+
# @param [Object] value the value to set
27+
# @return [Object] the new value
28+
29+
# @!macro fiber_local_var_method_bind
30+
#
31+
# Bind the given value to fiber local storage during
32+
# execution of the given block.
33+
#
34+
# @param [Object] value the value to bind
35+
# @yield the operation to be performed with the bound variable
36+
# @return [Object] the value
37+
538
def initialize(default = nil, &default_block)
639
if default && block_given?
740
raise ArgumentError, "Cannot use both value and block as default value"
@@ -15,20 +48,20 @@ def initialize(default = nil, &default_block)
1548
@default = default
1649
end
1750

18-
@name = :"concurrent_variable_#{object_id}"
51+
@index = LOCALS.next_index(self)
1952
end
2053

21-
# @!macro thread_local_var_method_get
54+
# @!macro fiber_local_var_method_get
2255
def value
23-
Thread.current.fetch(@name) {default}
56+
LOCALS.fetch(@index) {default}
2457
end
2558

26-
# @!macro thread_local_var_method_set
59+
# @!macro fiber_local_var_method_set
2760
def value=(value)
28-
Thread.current[@name] = value
61+
LOCALS.set(@index, value)
2962
end
3063

31-
# @!macro thread_local_var_method_bind
64+
# @!macro fiber_local_var_method_bind
3265
def bind(value, &block)
3366
if block_given?
3467
old_value = self.value
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
require 'concurrent/constants'
2+
3+
module Concurrent
4+
# @!visibility private
5+
# @!macro internal_implementation_note
6+
#
7+
# An abstract implementation of local storage, with sub-classes for
8+
# per-thread and per-fiber locals.
9+
#
10+
# Each execution context (EC, thread or fiber) has a lazily initialized array
11+
# of local variable values. Each time a new local variable is created, we
12+
# allocate an "index" for it.
13+
#
14+
# For example, if the allocated index is 1, that means slot #1 in EVERY EC's
15+
# locals array will be used for the value of that variable.
16+
#
17+
# The good thing about using a per-EC structure to hold values, rather than
18+
# a global, is that no synchronization is needed when reading and writing
19+
# those values (since the structure is only ever accessed by a single
20+
# thread).
21+
#
22+
# Of course, when a local variable is GC'd, 1) we need to recover its index
23+
# for use by other new local variables (otherwise the locals arrays could
24+
# get bigger and bigger with time), and 2) we need to null out all the
25+
# references held in the now-unused slots (both to avoid blocking GC of those
26+
# objects, and also to prevent "stale" values from being passed on to a new
27+
# local when the index is reused).
28+
#
29+
# Because we need to null out freed slots, we need to keep references to
30+
# ALL the locals arrays, so we can null out the appropriate slots in all of
31+
# them. This is why we need to use a finalizer to clean up the locals array
32+
# when the EC goes out of scope.
33+
class AbstractLocals
34+
def initialize(name_prefix = :concurrent_locals)
35+
@free = []
36+
@lock = Mutex.new
37+
@all_locals = {}
38+
@next = 0
39+
40+
@name = :"#{name_prefix}_#{object_id}"
41+
end
42+
43+
def synchronize
44+
@lock.synchronize { yield }
45+
end
46+
47+
if Concurrent.on_cruby?
48+
def weak_synchronize
49+
yield
50+
end
51+
else
52+
alias_method :weak_synchronize, :synchronize
53+
end
54+
55+
def next_index(target)
56+
index = synchronize do
57+
if @free.empty?
58+
@next += 1
59+
else
60+
@free.pop
61+
end
62+
end
63+
64+
# When the target goes out of scope, we should free the associated index
65+
# and all values stored into it.
66+
ObjectSpace.define_finalizer(target, target_finalizer(index))
67+
68+
return index
69+
end
70+
71+
def free_index(index)
72+
weak_synchronize do
73+
# The cost of GC'ing a TLV is linear in the number of ECs using local
74+
# variables. But that is natural! More ECs means more storage is used
75+
# per local variable. So naturally more CPU time is required to free
76+
# more storage.
77+
#
78+
# DO NOT use each_value which might conflict with new pair assignment
79+
# into the hash in #set method.
80+
@all_locals.values.each do |locals|
81+
locals[index] = nil
82+
end
83+
84+
# free index has to be published after the arrays are cleared:
85+
@free << index
86+
end
87+
end
88+
89+
def fetch(index, default = nil)
90+
if locals = self.locals
91+
value = locals[index]
92+
end
93+
94+
if value.nil?
95+
if block_given?
96+
yield
97+
else
98+
default
99+
end
100+
elsif value.equal?(NULL)
101+
nil
102+
else
103+
value
104+
end
105+
end
106+
107+
def set(index, value)
108+
locals = self.locals!
109+
locals[index] = (value.nil? ? NULL : value)
110+
111+
value
112+
end
113+
114+
private
115+
116+
# When the target index goes out of scope, clean up that slot across all locals currently assigned.
117+
def target_finalizer(index)
118+
proc do
119+
free_index(index)
120+
end
121+
end
122+
123+
# When a target (locals) goes out of scope, delete the locals from all known locals.
124+
def locals_finalizer(locals_object_id)
125+
proc do |locals_id|
126+
weak_synchronize do
127+
@all_locals.delete(locals_object_id)
128+
end
129+
end
130+
end
131+
132+
# Returns the locals for the current scope, or nil if none exist.
133+
def locals
134+
raise NotImplementedError
135+
end
136+
137+
# Returns the locals for the current scope, creating them if necessary.
138+
def locals!
139+
raise NotImplementedError
140+
end
141+
end
142+
143+
# @!visibility private
144+
# @!macro internal_implementation_note
145+
# An array-backed storage of indexed variables per thread.
146+
class ThreadLocals < AbstractLocals
147+
def locals
148+
Thread.current.thread_variable_get(@name)
149+
end
150+
151+
def locals!
152+
thread = Thread.current
153+
locals = thread.thread_variable_get(@name)
154+
155+
unless locals
156+
locals = thread.thread_variable_set(@name, [])
157+
weak_synchronize do
158+
@all_locals[locals.object_id] = locals
159+
# When the thread goes out of scope, we should delete the associated locals:
160+
ObjectSpace.define_finalizer(thread, locals_finalizer(locals.object_id))
161+
end
162+
end
163+
164+
return locals
165+
end
166+
end
167+
168+
# @!visibility private
169+
# @!macro internal_implementation_note
170+
# An array-backed storage of indexed variables per fiber.
171+
class FiberLocals < AbstractLocals
172+
def locals
173+
Thread.current[@name]
174+
end
175+
176+
def locals!
177+
thread = Thread.current
178+
locals = thread[@name]
179+
180+
unless locals
181+
locals = thread[@name] = []
182+
weak_synchronize do
183+
@all_locals[locals.object_id] = locals
184+
# When the thread goes out of scope, we should delete the associated locals:
185+
ObjectSpace.define_finalizer(Fiber.current, locals_finalizer(locals.object_id))
186+
end
187+
end
188+
189+
return locals
190+
end
191+
end
192+
end

lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,42 @@
11
require 'concurrent/constants'
2+
require_relative 'locals'
23

34
module Concurrent
45

56
# @!macro thread_local_var
67
class ThreadLocalVar
8+
LOCALS = ThreadLocals.new(:concurrent_fiber_local_var)
9+
710
# @!macro thread_local_var_method_initialize
11+
#
12+
# Creates a thread local variable.
13+
#
14+
# @param [Object] default the default value when otherwise unset
15+
# @param [Proc] default_block Optional block that gets called to obtain the
16+
# default value for each thread
17+
18+
# @!macro thread_local_var_method_get
19+
#
20+
# Returns the value in the current thread's copy of this thread-local variable.
21+
#
22+
# @return [Object] the current value
23+
24+
# @!macro thread_local_var_method_set
25+
#
26+
# Sets the current thread's copy of this thread-local variable to the specified value.
27+
#
28+
# @param [Object] value the value to set
29+
# @return [Object] the new value
30+
31+
# @!macro thread_local_var_method_bind
32+
#
33+
# Bind the given value to thread local storage during
34+
# execution of the given block.
35+
#
36+
# @param [Object] value the value to bind
37+
# @yield the operation to be performed with the bound variable
38+
# @return [Object] the value
39+
840
def initialize(default = nil, &default_block)
941
if default && block_given?
1042
raise ArgumentError, "Cannot use both value and block as default value"
@@ -18,29 +50,17 @@ def initialize(default = nil, &default_block)
1850
@default = default
1951
end
2052

21-
@name = :"concurrent_variable_#{object_id}"
53+
@index = LOCALS.next_index(self)
2254
end
2355

2456
# @!macro thread_local_var_method_get
2557
def value
26-
value = Thread.current.thread_variable_get(@name)
27-
28-
if value.nil?
29-
default
30-
elsif value.equal?(NULL)
31-
nil
32-
else
33-
value
34-
end
58+
LOCALS.fetch(@index) {default}
3559
end
3660

3761
# @!macro thread_local_var_method_set
3862
def value=(value)
39-
if value.nil?
40-
value = NULL
41-
end
42-
43-
Thread.current.thread_variable_set(@name, value)
63+
LOCALS.set(@index, value)
4464
end
4565

4666
# @!macro thread_local_var_method_bind

0 commit comments

Comments
 (0)