Skip to content

Commit 983db3c

Browse files
author
Petr Chalupa
committed
Merge pull request #296 from ruby-concurrency/synchronization
Synchronization 3.update
2 parents ff3b8cf + 3fc5fe5 commit 983db3c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1254
-495
lines changed

doc/synchronization.md

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ end
8888

8989
### Naming conventions
9090

91-
Instance variables with camel case names are final and never reassigned.
91+
Instance variables with camel case names are final and never reassigned, e.g. `@FinalVariable`.
9292

9393
## Fields with CAS operations
9494

@@ -115,7 +115,61 @@ class Event < Synchronization::Object
115115
end
116116
```
117117

118-
## Memory model (sort of)
118+
Operations on `@Touched` field have volatile semantic.
119119

120-
// TODO
120+
## Memory model
121+
122+
*Intended for further revision, and extension.*
123+
124+
When writing libraries in `concurrent-ruby` we are reasoning based on following memory model which is further extended by features provided in `Synchronization::Object` (described above).
125+
126+
The memory model is constructed based on our best effort and knowledge of the 3 main Ruby implementations (CRuby, JRuby, Rubinius). When considering certain aspect we always choose the weakest guarantee (e.g. local variable updates are always visible in CRuby but not in JRuby, so in this case JRuby behavior is picked). If some Ruby behavior is omitted here it is considered unsafe for use in parallel environment (Reasons may be lack of information, or difficulty of verification).
127+
128+
This takes in account following implementations:
129+
130+
- CRuby 1.9 - 2.2 (no differences found)
131+
- JRuby 1.7
132+
- JRuby 9 *not examined yet, same behavior as in 1.7 assumed*
133+
- Rubinius 2.5
134+
135+
We are interested in following behaviors:
136+
137+
- **volatility** - in Java's sense. Any written value is immediately visible to any subsequent reads including all writes leading to this value.
138+
- **atomicity** - operation is either done or not as a whole.
139+
140+
### Variables
141+
142+
- **Local variables** - atomic assignment, non-volatile.
143+
- Consequence: a lambda defined on `thread1` executing on `thread2` may not see updated values in local variables captured in its closure.
144+
- Reason: local variables are non-volatile on Jruby and Rubinius.
145+
- **Instance variables** - atomic assignment, non-volatile.
146+
- Consequence: Different thread may see old values; different thread may see not fully-initialized object.
147+
- Reason: local variables are non-volatile on Jruby and Rubinius.
148+
- **Constants** - atomic assignment, volatile.
149+
150+
Other:
151+
152+
- **Global variables** - we don't use them, omitted (atomic and volatile on JRuby and CRuby, Rubinius unknown)
153+
- **Class variables** - we don't use them, omitted (atomic and volatile on JRuby and CRuby, Rubinius unknown)
154+
155+
### Assumptions
156+
157+
Following operations are **assumed** thread-safe, volatile and atomic on all implementations:
158+
159+
- Class definition
160+
- Method definition
161+
- Library requirement
162+
163+
It's best practice though to eager load before going into parallel part of an application.
164+
165+
### Issues to be aware of
166+
167+
- **Initialization** - Since instance variables are not volatile and a particular implementation may preinitialize values with nils, based on shapes it already saw, a second thread obtaining reference to newly constructed may still see old preinitialized values instead of values set in `initialize` method. To fix this `ensure_ivar_visibility!` can be used or the object can be safely published in a volatile field.
168+
- **`||=`, `+=` and similar** - are not atomic.
169+
170+
### Notes/Sources on implementations
171+
172+
- [JRuby wiki page on concurrency](https://github.com/jruby/jruby/wiki/Concurrency-in-jruby)
173+
- [Rubinius page on concurrency](http://rubini.us/doc/en/systems/concurrency/)
174+
- CRuby has GVL. Any GVL release and acquire uses lock which means that all writes done by a releasing thread will be visible to the second acquiring thread. See: <https://github.com/ruby/ruby/blob/ruby_2_2/thread_pthread.c#L101-L107>
121175

examples/benchmark_new_futures.rb

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,47 +2,83 @@
22
require 'concurrent'
33
require 'concurrent-edge'
44

5+
# require 'ruby-prof'
6+
#
7+
# result = RubyProf.profile do
8+
# 1000.times do
9+
# head = Concurrent.future { 1 }
10+
# branch1 = head.then(&:succ)
11+
# branch2 = head.then(&:succ).then(&:succ)
12+
# branch3 = head.then(&:succ).then(&:succ).then(&:succ)
13+
# Concurrent.join(branch1, branch2, branch3).then { |(a, b, c)| a + b + c }.value!
14+
# end
15+
# end
16+
#
17+
# printer = RubyProf::FlatPrinter.new(result)
18+
# printer.print(STDOUT)
19+
#
20+
# printer = RubyProf::GraphPrinter.new(result)
21+
# printer.print(STDOUT, {})
22+
#
23+
# exit
24+
525
scale = 1
626
time = 10 * scale
727
warmup = 2 * scale
828
warmup *= 10 if Concurrent.on_jruby?
929

30+
Benchmark.ips(time, warmup) do |x|
31+
x.report('flat-old') { Concurrent::Promise.execute { 1 }.flat_map { |v| Concurrent::Promise.execute { v + 2 } }.value! }
32+
x.report('flat-new') { Concurrent.future(:fast) { 1 }.then { |v| Concurrent.future(:fast) { v+ 1 } }.flat.value! }
33+
x.compare!
34+
end
35+
36+
Benchmark.ips(time, warmup) do |x|
37+
x.report('status-old') { f = Concurrent::Promise.execute { nil }; 100.times { f.complete? } }
38+
x.report('status-new') { f = Concurrent.future(:fast) { nil }; 100.times { f.completed? } }
39+
x.compare!
40+
end
1041

1142
Benchmark.ips(time, warmup) do |x|
1243
of = Concurrent::Promise.execute { 1 }
13-
nf = Concurrent.future { 1 }
44+
nf = Concurrent.future(:fast) { 1 }
1445
x.report('value-old') { of.value! }
1546
x.report('value-new') { nf.value! }
1647
x.compare!
1748
end
1849

1950
Benchmark.ips(time, warmup) do |x|
2051
x.report('graph-old') do
21-
head = Concurrent::Promise.execute { 1 }
22-
branch1 = head.then(&:succ)
23-
branch2 = head.then(&:succ).then(&:succ)
24-
Concurrent::Promise.zip(branch1, branch2).then { |(a, b)| a + b }.value!
52+
head = Concurrent::Promise.execute { 1 }
53+
10.times do
54+
branch1 = head.then(&:succ)
55+
branch2 = head.then(&:succ).then(&:succ)
56+
head = Concurrent::Promise.zip(branch1, branch2).then { |a, b| a + b }
57+
end
58+
head.value!
2559
end
2660
x.report('graph-new') do
27-
head = Concurrent.future { 1 }
28-
branch1 = head.then(&:succ)
29-
branch2 = head.then(&:succ).then(&:succ)
30-
(branch1 + branch2).then { |(a, b)| a + b }.value!
61+
head = Concurrent.future(:fast) { 1 }
62+
10.times do
63+
branch1 = head.then(&:succ)
64+
branch2 = head.then(&:succ).then(&:succ)
65+
head = (branch1 & branch2).then { |a, b| a + b }
66+
end
67+
head.value!
3168
end
3269
x.compare!
3370
end
3471

3572
Benchmark.ips(time, warmup) do |x|
3673
x.report('immediate-old') { Concurrent::Promise.execute { nil }.value! }
37-
x.report('immediate-new') { Concurrent.future { nil }.value! }
74+
x.report('immediate-new') { Concurrent.future(:fast) { nil }.value! }
3875
x.compare!
3976
end
4077

4178
Benchmark.ips(time, warmup) do |x|
4279
of = Concurrent::Promise.execute { 1 }
43-
nf = Concurrent.future { 1 }
44-
x.report('then-old') { of.then(&:succ).then(&:succ).value! }
45-
x.report('then-new') { nf.then(&:succ).then(&:succ).value! }
80+
nf = Concurrent.future(:fast) { 1 }
81+
x.report('then-old') { 100.times.reduce(nf) { |nf, _| nf.then(&:succ) }.value! }
82+
x.report('then-new') { 100.times.reduce(nf) { |nf, _| nf.then(&:succ) }.value! }
4683
x.compare!
4784
end
48-

examples/edge_futures.in.rb

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
### Simple asynchronous task
2+
3+
future = Concurrent.future { sleep 0.1; 1 + 1 } # evaluation starts immediately
4+
future.completed?
5+
# block until evaluated
6+
future.value
7+
future.completed?
8+
9+
10+
### Failing asynchronous task
11+
12+
future = Concurrent.future { raise 'Boom' }
13+
future.value
14+
future.value! rescue $!
15+
future.reason
16+
# re-raising
17+
raise future rescue $!
18+
19+
20+
### Chaining
21+
22+
head = Concurrent.future { 1 } #
23+
branch1 = head.then(&:succ) #
24+
branch2 = head.then(&:succ).then(&:succ) #
25+
branch1.zip(branch2).value
26+
(branch1 & branch2).then { |(a, b)| a + b }.value
27+
# pick only first completed
28+
(branch1 | branch2).value
29+
30+
31+
### Error handling
32+
33+
Concurrent.future { Object.new }.then(&:succ).then(&:succ).rescue { |e| e.class }.value # error propagates
34+
Concurrent.future { Object.new }.then(&:succ).rescue { 1 }.then(&:succ).value
35+
Concurrent.future { 1 }.then(&:succ).rescue { |e| e.message }.then(&:succ).value
36+
37+
38+
### Delay
39+
40+
# will not evaluate until asked by #value or other method requiring completion
41+
scheduledfuture = Concurrent.delay { 'lazy' }
42+
sleep 0.1 #
43+
future.completed?
44+
future.value
45+
46+
# propagates trough chain allowing whole or partial lazy chains
47+
48+
head = Concurrent.delay { 1 }
49+
branch1 = head.then(&:succ)
50+
branch2 = head.delay.then(&:succ)
51+
join = branch1 & branch2
52+
53+
sleep 0.1 # nothing will complete
54+
[head, branch1, branch2, join].map(&:completed?)
55+
56+
branch1.value
57+
sleep 0.1 # forces only head to complete, branch 2 stays incomplete
58+
[head, branch1, branch2, join].map(&:completed?)
59+
60+
join.value
61+
62+
63+
### Flatting
64+
65+
Concurrent.future { Concurrent.future { 1+1 } }.flat.value # waits for inner future
66+
67+
# more complicated example
68+
Concurrent.future { Concurrent.future { Concurrent.future { 1 + 1 } } }.
69+
flat(1).
70+
then { |f| f.then(&:succ) }.
71+
flat(1).value
72+
73+
74+
### Schedule
75+
76+
scheduled = Concurrent.schedule(0.1) { 1 }
77+
78+
scheduled.completed?
79+
scheduled.value # available after 0.1sec
80+
81+
# and in chain
82+
scheduled = Concurrent.delay { 1 }.schedule(0.1).then(&:succ)
83+
# will not be scheduled until value is requested
84+
sleep 0.1 #
85+
scheduled.value # returns after another 0.1sec
86+
87+
88+
### Completable Future and Event
89+
90+
future = Concurrent.future
91+
event = Concurrent.event
92+
93+
# will be blocked until completed
94+
t1 = Thread.new { future.value } #
95+
t2 = Thread.new { event.wait } #
96+
97+
future.success 1
98+
future.success 1 rescue $!
99+
future.try_success 2
100+
event.complete
101+
102+
[t1, t2].each &:join #
103+
104+
105+
### Callbacks
106+
107+
queue = Queue.new
108+
future = Concurrent.delay { 1 + 1 }
109+
110+
future.on_success { queue << 1 } # evaluated asynchronously
111+
future.on_success! { queue << 2 } # evaluated on completing thread
112+
113+
queue.empty?
114+
future.value
115+
queue.pop
116+
queue.pop
117+
118+
119+
### Thread-pools
120+
121+
Concurrent.future(:fast) { 2 }.then(:io) { File.read __FILE__ }.wait
122+
123+
124+
### Interoperability with actors
125+
126+
actor = Concurrent::Actor::Utils::AdHoc.spawn :square do
127+
-> v { v ** 2 }
128+
end
129+
130+
Concurrent.
131+
future { 2 }.
132+
then_ask(actor).
133+
then { |v| v + 2 }.
134+
value
135+
136+
actor.ask(2).then(&:succ).value
137+
138+
139+
### Common use-cases Examples
140+
141+
# simple background processing
142+
Concurrent.future { do_stuff }
143+
144+
# parallel background processing
145+
jobs = 10.times.map { |i| Concurrent.future { i } } #
146+
Concurrent.zip(*jobs).value
147+
148+
149+
# periodic task
150+
def schedule_job
151+
Concurrent.schedule(1) { do_stuff }.
152+
rescue { |e| report_error e }.
153+
then { schedule_job }
154+
end
155+
156+
schedule_job
157+
158+

0 commit comments

Comments
 (0)