Skip to content

Commit 37edfe3

Browse files
author
Petr Chalupa
committed
Merge pull request #284 from ruby-concurrency/synchronization
Synchronization updates: final and volatile fields, immutable struct
2 parents 8266271 + 67a82f7 commit 37edfe3

26 files changed

+859
-469
lines changed

doc/synchronization.md

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
`Synchronization` module provides common layer for synchronization. It provides same guaranties independent of any particular Ruby implementation.
2+
3+
*This is a new module, it is expected to fully stabilize for 1.0 release.*
4+
5+
## Synchronization::Object
6+
7+
Provides common parent for all objects which need to be synchronized or be using other synchronization tools. It provides:
8+
9+
- Synchronized block
10+
- Methods for waiting and signaling
11+
- Volatile fields
12+
- Ensure visibility of final fields
13+
- Fields with CAS operations
14+
15+
## Synchronized block
16+
17+
`Synchronization::Object` provides private method `#synchronize(&block)`. For a given object only one Thread can enter one of the blocks synchronized against this object. Object is locked when a thread enters one of the synchronized blocks.
18+
19+
Example of a simple counter which can be used by multiple threads:
20+
21+
```ruby
22+
class SafeCounter < Concurrent::Synchronization::Object
23+
def initialize
24+
super
25+
synchronize { @count = 0 }
26+
end
27+
28+
def increment
29+
synchronize { @count += 1 }
30+
end
31+
32+
def count
33+
synchronize { @count }
34+
end
35+
end
36+
```
37+
38+
### Naming conventions
39+
40+
Methods starting with `ns_` are marking methods that are not using synchronization by themselves, they have to be used inside synchronize block. They are usually used in pairs to separate the synchronization from behavior:
41+
42+
```ruby
43+
def compute
44+
service.report synchronize { ns_compute }
45+
end
46+
47+
private
48+
49+
def ns_compute
50+
ns_compute_reduce ns_compute_map
51+
end
52+
```
53+
where `compute` defines how is it synchronized and `ns_compute` handles the behavior (in this case the computation). `ns_` methods should only call other `ns_` methods or `pr_` methods. They can call normal methods on other objects, but that should be done with care (better to avoid) because the thread escapes this object while the lock is still held, which can lead to deadlock. That's why the `report` method is called in `compute` and not in `ns_compute`.
54+
55+
`pr_` methods are pure functions they can be used in and outside of synchronized blocks.
56+
57+
## Methods for waiting and signaling
58+
59+
Sometimes while already inside the synchronized block some condition is not met. Then the thread needs to wait (releasing the lock) until the condition is met. The waiting thread is then signaled that it can continue.
60+
61+
To fulfill these needs there are private methods:
62+
63+
- `ns_wait` {include:Concurrent::Synchronization::AbstractObject#ns_wait}
64+
- `ns_wait_until` {include:Concurrent::Synchronization::AbstractObject#ns_wait_until}
65+
- `ns_signal` {include:Concurrent::Synchronization::AbstractObject#ns_signal}
66+
- `ns_broadcast` {include:Concurrent::Synchronization::AbstractObject#ns_broadcast}
67+
68+
All methods have to be called inside synchronized block.
69+
70+
## Volatile fields
71+
72+
`Synchronization::Object` can have volatile fields (Java semantic). They are defined by `attr_volatile :field_name`. `attr_volatile` defines reader and writer with the `field_name`. Any write is always immediately visible for any subsequent reads of the same field.
73+
74+
## Ensure visibility of final fields
75+
76+
Instance variables assigned only once in `initialize` method are not guaranteed to be visible to all threads. For that user can call `ensure_ivar_visibility!` method, like in following example taken from `Edge::AbstractPromise` implementation:
77+
78+
```ruby
79+
class AbstractPromise < Synchronization::Object
80+
def initialize(future, *args, &block)
81+
super(*args, &block)
82+
@Future = future
83+
ensure_ivar_visibility!
84+
end
85+
# ...
86+
end
87+
```
88+
89+
### Naming conventions
90+
91+
Instance variables with camel case names are final and never reassigned.
92+
93+
## Fields with CAS operations
94+
95+
They are not supported directly, but AtomicReference can be stored in final field and then CAS operations can be done on it, like in following example taken from `Edge::Event` implementation:
96+
97+
```ruby
98+
class Event < Synchronization::Object
99+
extend FutureShortcuts
100+
101+
def initialize(promise, default_executor = :io)
102+
@Promise = promise
103+
@DefaultExecutor = default_executor
104+
@Touched = AtomicBoolean.new(false)
105+
super()
106+
ensure_ivar_visibility!
107+
end
108+
# ...
109+
def touch
110+
# distribute touch to promise only once
111+
@Promise.touch if @Touched.make_true
112+
self
113+
end
114+
# ...
115+
end
116+
```
117+
118+
## Memory model (sort of)
119+
120+
// TODO
121+

examples/benchmark_new_futures.rb

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
require 'benchmark/ips'
2+
require 'concurrent'
3+
require 'concurrent-edge'
4+
5+
scale = 1
6+
time = 10 * scale
7+
warmup = 2 * scale
8+
warmup *= 10 if Concurrent.on_jruby?
9+
10+
11+
Benchmark.ips(time, warmup) do |x|
12+
of = Concurrent::Promise.execute { 1 }
13+
nf = Concurrent.future { 1 }
14+
x.report('value-old') { of.value! }
15+
x.report('value-new') { nf.value! }
16+
x.compare!
17+
end
18+
19+
Benchmark.ips(time, warmup) do |x|
20+
x.report('graph-old') do
21+
head = Concurrent::Promise.execute { 1 }
22+
branch1 = head.then(&:succ)
23+
branch2 = head.then(&:succ).then(&:succ)
24+
Concurrent::Promise.zip(branch1, branch2).then { |(a, b)| a + b }.value!
25+
end
26+
x.report('graph-new') do
27+
head = Concurrent.future { 1 }
28+
branch1 = head.then(&:succ)
29+
branch2 = head.then(&:succ).then(&:succ)
30+
(branch1 + branch2).then { |(a, b)| a + b }.value!
31+
end
32+
x.compare!
33+
end
34+
35+
Benchmark.ips(time, warmup) do |x|
36+
x.report('immediate-old') { Concurrent::Promise.execute { nil }.value! }
37+
x.report('immediate-new') { Concurrent.future { nil }.value! }
38+
x.compare!
39+
end
40+
41+
Benchmark.ips(time, warmup) do |x|
42+
of = Concurrent::Promise.execute { 1 }
43+
nf = Concurrent.future { 1 }
44+
x.report('then-old') { of.then(&:succ).then(&:succ).value! }
45+
x.report('then-new') { nf.then(&:succ).then(&:succ).value! }
46+
x.compare!
47+
end
48+

ext/com/concurrent_ruby/ext/SynchronizationLibrary.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.jruby.runtime.builtin.IRubyObject;
1414
import org.jruby.runtime.load.Library;
1515
import org.jruby.runtime.Block;
16+
import org.jruby.runtime.Visibility;
1617
import org.jruby.RubyBoolean;
1718
import org.jruby.RubyNil;
1819
import org.jruby.runtime.ThreadContext;
@@ -47,19 +48,21 @@ public JavaObject(Ruby runtime, RubyClass metaClass) {
4748
super(runtime, metaClass);
4849
}
4950

50-
@JRubyMethod
51-
public IRubyObject initialize(ThreadContext context) {
52-
return context.nil;
51+
@JRubyMethod(rest = true)
52+
public IRubyObject initialize(ThreadContext context, IRubyObject[] args, Block block) {
53+
synchronized (this) {
54+
return callMethod(context, "ns_initialize", args, block);
55+
}
5356
}
5457

55-
@JRubyMethod(name = "synchronize")
58+
@JRubyMethod(name = "synchronize", visibility = Visibility.PRIVATE)
5659
public IRubyObject rubySynchronize(ThreadContext context, Block block) {
5760
synchronized (this) {
5861
return block.yield(context, null);
5962
}
6063
}
6164

62-
@JRubyMethod(name = "ns_wait", optional = 1)
65+
@JRubyMethod(name = "ns_wait", optional = 1, visibility = Visibility.PRIVATE)
6366
public IRubyObject nsWait(ThreadContext context, IRubyObject[] args) {
6467
Ruby runtime = context.runtime;
6568
if (args.length > 1) {
@@ -91,16 +94,21 @@ public IRubyObject nsWait(ThreadContext context, IRubyObject[] args) {
9194
return this;
9295
}
9396

94-
@JRubyMethod(name = "ns_signal")
97+
@JRubyMethod(name = "ns_signal", visibility = Visibility.PRIVATE)
9598
public IRubyObject nsSignal(ThreadContext context) {
9699
notify();
97100
return this;
98101
}
99102

100-
@JRubyMethod(name = "ns_broadcast")
103+
@JRubyMethod(name = "ns_broadcast", visibility = Visibility.PRIVATE)
101104
public IRubyObject nsBroadcast(ThreadContext context) {
102105
notifyAll();
103106
return this;
104107
}
108+
109+
@JRubyMethod(name = "ensure_ivar_visibility!", visibility = Visibility.PRIVATE)
110+
public IRubyObject ensureIvarVisibilityBang(ThreadContext context) {
111+
return context.nil;
112+
}
105113
}
106114
}

ext/concurrent/extconf.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ def create_dummy_makefile
1111
end
1212
end
1313

14-
if defined?(JRUBY_VERSION) || ! Concurrent.allow_c_extensions?
14+
if Concurrent.on_jruby? || ! Concurrent.allow_c_extensions?
1515
create_dummy_makefile
1616
warn 'C optimizations are not supported on this version of Ruby.'
1717
else

lib/concurrent/actor/core.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class Core < Synchronization::Object
4747
# any logging system
4848
# @param [Proc] block for class instantiation
4949
def initialize(opts = {}, &block)
50-
super(&nil)
50+
super(&nil) # TODO use ns_initialize
5151
synchronize do
5252
@mailbox = Array.new
5353
@serialized_execution = SerializedExecution.new

lib/concurrent/at_exit.rb

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,6 @@ module Concurrent
88
class AtExitImplementation < Synchronization::Object
99
include Logging
1010

11-
def initialize(enabled = true)
12-
super()
13-
synchronize do
14-
@handlers = {}
15-
@enabled = enabled
16-
end
17-
end
18-
1911
# Add a handler to be run at `Kernel#at_exit`
2012
# @param [Object] handler_id optionally provide an id, if allready present, handler is replaced
2113
# @yield the handler
@@ -80,6 +72,11 @@ def run
8072

8173
private
8274

75+
def ns_initialize(enabled = true)
76+
@handlers = {}
77+
@enabled = enabled
78+
end
79+
8380
def runner
8481
run if synchronize { @enabled }
8582
end

lib/concurrent/atomic/count_down_latch.rb

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,10 @@ class PureCountDownLatch < Synchronization::Object
2121
#
2222
# @raise [ArgumentError] if `count` is not an integer or is less than zero
2323
def initialize(count = 1)
24-
super()
2524
unless count.is_a?(Fixnum) && count >= 0
2625
raise ArgumentError.new('count must be in integer greater than or equal zero')
2726
end
28-
synchronize { @count = count }
27+
super(count)
2928
end
3029

3130
# @!macro [attach] count_down_latch_method_wait
@@ -58,6 +57,12 @@ def count_down
5857
def count
5958
synchronize { @count }
6059
end
60+
61+
private
62+
63+
def ns_initialize(count)
64+
@count = count
65+
end
6166
end
6267

6368
if Concurrent.on_jruby?

lib/concurrent/atomic/cyclic_barrier.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,10 @@ class CyclicBarrier < Synchronization::Object
1515
#
1616
# @raise [ArgumentError] if `parties` is not an integer or is less than zero
1717
def initialize(parties, &block)
18-
super(&nil)
1918
if !parties.is_a?(Fixnum) || parties < 1
2019
raise ArgumentError.new('count must be in integer greater than or equal zero')
2120
end
22-
synchronize do
23-
@parties = parties
24-
@action = block
25-
ns_next_generation
26-
end
21+
super(parties, &block)
2722
end
2823

2924
# @return [Fixnum] the number of threads needed to pass the barrier
@@ -101,6 +96,11 @@ def ns_next_generation
10196
@number_waiting = 0
10297
end
10398

99+
def ns_initialize(parties, &block)
100+
@parties = parties
101+
@action = block
102+
ns_next_generation
103+
end
104104

105105
end
106106
end

lib/concurrent/atomic/event.rb

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,6 @@ class Event < Synchronization::Object
1919
# `Event` will block.
2020
def initialize
2121
super
22-
synchronize do
23-
@set = false
24-
@iteration = 0
25-
end
2622
end
2723

2824
# Is the object in the set state?
@@ -83,5 +79,10 @@ def ns_set
8379
end
8480
true
8581
end
82+
83+
def ns_initialize
84+
@set = false
85+
@iteration = 0
86+
end
8687
end
8788
end

lib/concurrent/configuration.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def self.global_timer_set
106106
end
107107

108108
# General access point to global executors.
109-
# @param [Symbol, Executor] maps symbols:
109+
# @param [Symbol, Executor] executor_identifier symbols:
110110
# - :fast - {Concurrent.global_fast_executor}
111111
# - :io - {Concurrent.global_io_executor}
112112
# - :immediate - {Concurrent.global_immediate_executor}

0 commit comments

Comments
 (0)