Skip to content

Commit a9da852

Browse files
committed
Introduce safe_initialization! class marker instead of ensure_ivar_visibility!
* ensures full memory barier is inserted only when needed * ensures that it's inserted only once * testable correctness with @FinalIvar convention
1 parent 0bd741c commit a9da852

25 files changed

+193
-83
lines changed

ext/com/concurrent_ruby/ext/SynchronizationLibrary.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,12 @@ public IRubyObject initialize(ThreadContext context) {
8787

8888
@JRubyMethod(name = "full_memory_barrier", visibility = Visibility.PRIVATE)
8989
public IRubyObject fullMemoryBarrier(ThreadContext context) {
90-
if (UnsafeHolder.U == null) {
91-
// We are screwed
92-
throw new UnsupportedOperationException();
93-
} else if (UnsafeHolder.SUPPORTS_FENCES)
90+
if (UnsafeHolder.U == null || !UnsafeHolder.SUPPORTS_FENCES) {
91+
throw new UnsupportedOperationException(
92+
"concurrent-ruby requires java with sun.mics.Unsafe fences support, such as Java 8. " +
93+
"Current version is: " + System.getProperty("java.version"));
94+
} else {
9495
UnsafeHolder.fullFence();
95-
else {
96-
// TODO (pitr 06-Sep-2015): enforce Java 8
97-
throw new UnsupportedOperationException();
9896
}
9997
return context.nil;
10098
}

lib/concurrent/agent.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ def initialize(initial, opts = {})
224224
#
225225
# @return [Object] the current value
226226
def value
227-
@current.value
227+
@current.value # TODO (pitr 12-Sep-2015): broken unsafe read?
228228
end
229229

230230
alias_method :deref, :value

lib/concurrent/atom.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ module Concurrent
6060
class Atom < Synchronization::Object
6161
include Concern::Observable
6262

63+
safe_initialization!
6364
private *attr_volatile_with_cas(:value)
6465
public :value
6566

@@ -78,7 +79,7 @@ class Atom < Synchronization::Object
7879
def initialize(value, opts = {})
7980
@Validator = opts.fetch(:validator, -> v { true })
8081
self.observers = Collection::CopyOnNotifyObserverSet.new
81-
super(value) # ensures visibility
82+
super(value)
8283
end
8384

8485
# @!method value

lib/concurrent/atomic/read_write_lock.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ class ReadWriteLock < Synchronization::Object
4141
# @!visibility private
4242
MAX_WRITERS = RUNNING_WRITER - MAX_READERS - 1
4343

44+
safe_initialization!
45+
4446
# Implementation notes:
4547
# A goal is to make the uncontended path for both readers/writers lock-free
4648
# Only if there is reader-writer or writer-writer contention, should locks be used
@@ -54,10 +56,10 @@ class ReadWriteLock < Synchronization::Object
5456

5557
# Create a new `ReadWriteLock` in the unlocked state.
5658
def initialize
59+
super()
5760
@Counter = AtomicFixnum.new(0) # single integer which represents lock state
5861
@ReadLock = Synchronization::Lock.new
5962
@WriteLock = Synchronization::Lock.new
60-
super() # ensures visibility
6163
end
6264

6365
# Execute a block operation within a read lock.

lib/concurrent/atomic/reentrant_read_write_lock.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,15 @@ class ReentrantReadWriteLock < Synchronization::Object
9999
# @!visibility private
100100
WRITE_LOCK_MASK = MAX_WRITERS
101101

102+
safe_initialization!
103+
102104
# Create a new `ReentrantReadWriteLock` in the unlocked state.
103105
def initialize
106+
super()
104107
@Counter = AtomicFixnum.new(0) # single integer which represents lock state
105108
@ReadQueue = Synchronization::Lock.new # used to queue waiting readers
106109
@WriteQueue = Synchronization::Lock.new # used to queue waiting writers
107110
@HeldCount = ThreadLocalVar.new(0) # indicates # of R & W locks held by this thread
108-
super() # ensures visibility
109111
end
110112

111113
# Execute a block operation within a read lock.

lib/concurrent/channel/buffered_channel.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def initialize(size)
1414
end
1515

1616
def probe_set_size
17-
@probe_set.size
17+
@probe_set.size # TODO (pitr 12-Sep-2015): unsafe?
1818
end
1919

2020
def buffer_queue_size

lib/concurrent/delay.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def initialize(opts = {}, &block)
7474
#
7575
# @!macro delay_note_regarding_blocking
7676
def value(timeout = nil)
77-
if @executor
77+
if @executor # TODO (pitr 12-Sep-2015): broken unsafe read?
7878
super
7979
else
8080
# this function has been optimized for performance and

lib/concurrent/edge/future.rb

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ def post_on(executor, *args, &job)
126126

127127
# Represents an event which will happen in future (will be completed). It has to always happen.
128128
class Event < Synchronization::LockableObject
129+
safe_initialization!
129130
include Concern::Deprecation
130131

131132
# @!visibility private
@@ -167,13 +168,15 @@ def to_sym
167168
COMPLETED = Completed.new
168169

169170
def initialize(promise, default_executor)
171+
super()
170172
@Promise = promise
171173
@DefaultExecutor = default_executor
172174
@Touched = AtomicBoolean.new(false)
173175
@Callbacks = LockFreeStack.new
174-
@Waiters = LockFreeStack.new # TODO replace with AtomicFixnum, avoid aba problem
176+
# TODO (pitr 12-Sep-2015): replace with AtomicFixnum, avoid aba problem
177+
# TODO (pitr 12-Sep-2015): look at java.util.concurrent solution
178+
@Waiters = LockFreeStack.new
175179
@State = AtomicReference.new PENDING
176-
super() # ensures visibility
177180
end
178181

179182
# @return [:pending, :completed]
@@ -877,9 +880,11 @@ def hide_completable
877880
# @abstract
878881
# @!visibility private
879882
class AbstractPromise < Synchronization::Object
883+
safe_initialization!
884+
880885
def initialize(future)
881-
@Future = future
882886
super()
887+
@Future = future
883888
end
884889

885890
def future
@@ -1373,10 +1378,12 @@ def initialize(default_executor, intended_time)
13731378

13741379
# @note proof of concept
13751380
class Channel < Synchronization::Object
1381+
safe_initialization!
1382+
13761383
# TODO make lock free
13771384
def initialize
1378-
@ProbeSet = Concurrent::Channel::WaitableList.new
13791385
super()
1386+
@ProbeSet = Concurrent::Channel::WaitableList.new
13801387
end
13811388

13821389
def probe_set_size

lib/concurrent/edge/lock_free_linked_set.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def add(item)
5555

5656
node = Node.new item, curr
5757

58-
if pred.Successor_reference.compare_and_set curr, node, false, false
58+
if pred.successor_reference.compare_and_set curr, node, false, false
5959
return true
6060
end
6161
end
@@ -88,7 +88,7 @@ def contains?(item)
8888

8989
while curr < item
9090
curr = curr.next_node
91-
marked = curr.Successor_reference.marked?
91+
marked = curr.successor_reference.marked?
9292
end
9393

9494
curr == item && !marked
@@ -109,11 +109,11 @@ def remove(item)
109109
return false if curr != item
110110

111111
succ = curr.next_node
112-
removed = curr.Successor_reference.compare_and_set succ, succ, false, true
112+
removed = curr.successor_reference.compare_and_set succ, succ, false, true
113113

114114
next_node unless removed
115115

116-
pred.Successor_reference.compare_and_set curr, succ, false, false
116+
pred.successor_reference.compare_and_set curr, succ, false, false
117117

118118
return true
119119
end
@@ -134,9 +134,9 @@ def each
134134

135135
until curr.last?
136136
curr = curr.next_node
137-
marked = curr.Successor_reference.marked?
137+
marked = curr.successor_reference.marked?
138138

139-
yield curr.Data unless marked
139+
yield curr.data unless marked
140140
end
141141

142142
self

lib/concurrent/edge/lock_free_linked_set/node.rb

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,36 @@ class LockFreeLinkedSet
66
class Node < Synchronization::Object
77
include Comparable
88

9-
attr_reader :Data, :Successor_reference, :Key
9+
safe_initialization!
1010

1111
def initialize(data = nil, successor = nil)
12-
@Successor_reference = AtomicMarkableReference.new(successor || Tail.new)
13-
@Data = data
14-
@Key = key_for data
12+
super()
13+
@SuccessorReference = AtomicMarkableReference.new(successor || Tail.new)
14+
@Data = data
15+
@Key = key_for data
16+
end
17+
18+
def data
19+
@Data
20+
end
21+
22+
def successor_reference
23+
@SuccessorReference
24+
end
1525

16-
super() # ensures visibility
26+
def key
27+
@Key
1728
end
1829

1930
# Check to see if the node is the last in the list.
2031
def last?
21-
@Successor_reference.value.is_a? Tail
32+
@SuccessorReference.value.is_a? Tail
2233
end
2334

2435
# Next node in the list. Note: this is not the AtomicMarkableReference
2536
# of the next node, this is the actual Node itself.
2637
def next_node
27-
@Successor_reference.value
38+
@SuccessorReference.value
2839
end
2940

3041
# This method provides a unqiue key for the data which will be used for
@@ -47,7 +58,7 @@ def <=>(other)
4758
# a self-loop.
4859
class Tail < Node
4960
def initialize(_data = nil, _succ = nil)
50-
@Successor_reference = AtomicMarkableReference.new self
61+
@SuccessorReference = AtomicMarkableReference.new self
5162
end
5263

5364
# Always greater than other nodes. This means that traversal will end

0 commit comments

Comments
 (0)