Skip to content

Commit 3efc2f0

Browse files
committed
Atom updates.
- No longer `Dereferenceable` - Now `Observable` - Added a `#reset` method
1 parent 360ef10 commit 3efc2f0

File tree

5 files changed

+126
-76
lines changed

5 files changed

+126
-76
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
- Each object uses its own `SingleThreadExecutor` instead of the global thread pool.
66
- No longers supports executor injection
77
- Much better documentation
8+
* `Atom` updates
9+
- No longer `Dereferenceable`
10+
- Now `Observable`
11+
- Added a `#reset` method
812

913
## Current Release v1.0.0.pre1 (19 Aug 2015)
1014

lib/concurrent/atom.rb

Lines changed: 57 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
require 'concurrent/concern/dereferenceable'
21
require 'concurrent/atomic/atomic_reference'
2+
require 'concurrent/collection/copy_on_notify_observer_set'
3+
require 'concurrent/concern/observable'
34
require 'concurrent/synchronization/object'
45

56
module Concurrent
@@ -18,11 +19,9 @@ module Concurrent
1819
# new value to the result of running the given block if and only if that
1920
# value validates.
2021
#
21-
# @!macro copy_options
22-
#
2322
# @see http://clojure.org/atoms Clojure Atoms
2423
class Atom < Synchronization::Object
25-
include Concern::Dereferenceable
24+
include Concern::Observable
2625

2726
# Create a new atom with the given initial value.
2827
#
@@ -38,20 +37,18 @@ class Atom < Synchronization::Object
3837
# @raise [ArgumentError] if the validator is not a `Proc` (when given)
3938
def initialize(value, opts = {})
4039
super()
41-
42-
@validator = opts.fetch(:validator, ->(v){ true })
43-
raise ArgumentError.new('validator must be a proc') unless @validator.is_a? Proc
44-
45-
@value = Concurrent::AtomicReference.new(value)
46-
ns_set_deref_options(opts)
47-
ensure_ivar_visibility!
40+
synchronize do
41+
@validator = opts.fetch(:validator, ->(v){ true })
42+
@value = Concurrent::AtomicReference.new(value)
43+
self.observers = Collection::CopyOnNotifyObserverSet.new
44+
end
4845
end
4946

5047
# The current value of the atom.
5148
#
5249
# @return [Object] The current value.
5350
def value
54-
apply_deref_options(@value.value)
51+
@value.value
5552
end
5653
alias_method :deref, :value
5754

@@ -87,45 +84,66 @@ def value
8784
def swap(*args)
8885
raise ArgumentError.new('no block given') unless block_given?
8986

90-
begin
91-
loop do
92-
old_value = @value.value
87+
loop do
88+
old_value = @value.value
89+
begin
9390
new_value = yield(old_value, *args)
94-
return old_value unless @validator.call(new_value)
95-
return new_value if compare_and_set!(old_value, new_value)
91+
break old_value unless valid?(new_value)
92+
break new_value if compare_and_set(old_value, new_value)
93+
rescue
94+
break old_value
9695
end
97-
rescue
98-
return @value.value
9996
end
10097
end
10198

102-
# @!macro [attach] atom_compare_and_set
103-
# Atomically sets the value of atom to the new value if and only if the
104-
# current value of the atom is identical to the old value and the new
105-
# value successfully validates against the (optional) validator given
106-
# at construction.
99+
# Atomically sets the value of atom to the new value if and only if the
100+
# current value of the atom is identical to the old value and the new
101+
# value successfully validates against the (optional) validator given
102+
# at construction.
107103
#
108-
# @param [Object] old_value The expected current value.
109-
# @param [Object] new_value The intended new value.
104+
# @param [Object] old_value The expected current value.
105+
# @param [Object] new_value The intended new value.
110106
#
111-
# @return [Boolean] True if the value is changed else false.
107+
# @return [Boolean] True if the value is changed else false.
112108
def compare_and_set(old_value, new_value)
113-
compare_and_set!(old_value, new_value)
114-
rescue
115-
false
109+
if valid?(new_value) && @value.compare_and_set(old_value, new_value)
110+
observers.notify_observers(Time.now, new_value, nil)
111+
true
112+
else
113+
false
114+
end
116115
end
117116

118-
private
119-
120-
# @!macro atom_compare_and_set
121-
# @raise [Exception] if the validator proc raises an exception
122-
# @!visibility private
123-
def compare_and_set!(old_value, new_value)
124-
if @validator.call(new_value) # may raise exception
125-
@value.compare_and_set(old_value, new_value)
117+
# Atomically sets the value of atom to the new value without regard for the
118+
# current value so long as the new value successfully validates against the
119+
# (optional) validator given at construction.
120+
#
121+
# @param [Object] new_value The intended new value.
122+
#
123+
# @return [Object] The final value of the atom after all operations and
124+
# validations are complete.
125+
def reset(new_value)
126+
old_value = @value.value
127+
if valid?(new_value)
128+
@value.set(new_value)
129+
observers.notify_observers(Time.now, new_value, nil)
130+
new_value
126131
else
127-
false
132+
old_value
128133
end
129134
end
135+
136+
private
137+
138+
# Is the new value valid?
139+
#
140+
# @param [Object] new_value The intended new value.
141+
# @return [Boolean] false if the validator function returns false or raises
142+
# an exception else true
143+
def valid?(new_value)
144+
@validator.call(new_value)
145+
rescue
146+
false
147+
end
130148
end
131149
end

lib/concurrent/concern/observable.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ module Concern
2121
#
2222
# In a multi threaded environment things are more complex. The `subject` must
2323
# synchronize the access to its data structure and to do so currently we're
24-
# using two specialized ObserverSet: CopyOnWriteObserverSet and
25-
# CopyOnNotifyObserverSet.
24+
# using two specialized ObserverSet: {Concurrent::Concern::CopyOnWriteObserverSet}
25+
# and {Concurrent::Concern::CopyOnNotifyObserverSet}.
2626
#
2727
# When implementing and `observer` there's a very important rule to remember:
2828
# **there are no guarantees about the thread that will execute the callback**

spec/concurrent/atom_spec.rb

Lines changed: 63 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,15 @@
1-
require_relative 'concern/dereferenceable_shared'
1+
require_relative 'concern/observable_shared'
22

33
module Concurrent
44

55
describe Atom do
66

7-
it_should_behave_like :dereferenceable do
8-
def dereferenceable_subject(value, opts = {})
9-
Atom.new(value, opts)
10-
end
11-
end
12-
137
context 'construction' do
148

159
it 'sets the initial value to the given value' do
1610
atom = Atom.new(42)
1711
expect(atom.value).to eq 42
1812
end
19-
20-
it 'raises an exception if the validator is not a proc' do
21-
expect {
22-
Atom.new(42, validator: 42)
23-
}.to raise_error(ArgumentError)
24-
end
2513
end
2614

2715
context '#compare_and_set' do
@@ -75,7 +63,7 @@ def dereferenceable_subject(value, opts = {})
7563
end
7664
end
7765

78-
context 'swap' do
66+
context '#swap' do
7967

8068
it 'raises an exception when no block is given' do
8169
atom = Atom.new(42)
@@ -141,25 +129,70 @@ def dereferenceable_subject(value, opts = {})
141129
expect(atom.swap{ 100 }).to eq 42
142130
end
143131

144-
#it 'calls the block more than once if the value changes underneath' do
145-
#latch = Concurrent::CountDownLatch.new
146-
#counter = Concurrent::AtomicBoolean.new(0)
147-
#atom = Atom.new(0)
132+
it 'calls the block more than once if the value changes underneath' do
133+
latch1 = Concurrent::CountDownLatch.new
134+
latch2 = Concurrent::CountDownLatch.new
135+
counter = Concurrent::AtomicFixnum.new(0)
136+
atom = Atom.new(0)
137+
138+
t = Thread.new do
139+
atom.swap do |value|
140+
latch1.count_down
141+
latch2.wait(1)
142+
counter.increment
143+
42
144+
end
145+
end
146+
147+
latch1.wait(1)
148+
atom.swap{ 100 }
149+
latch2.count_down
150+
t.join(1)
151+
152+
expect(counter.value).to be > 1
153+
end
154+
end
155+
156+
context '#reset' do
157+
158+
it 'sets the new value' do
159+
atom = Atom.new(42)
160+
atom.reset(:foo)
161+
expect(atom.value).to eq :foo
162+
end
163+
164+
it 'returns the new value on success' do
165+
atom = Atom.new(42)
166+
expect(atom.reset(:foo)).to eq :foo
167+
end
168+
169+
it 'returns the new value on success' do
170+
atom = Atom.new(42)
171+
expect(atom.reset(:foo)).to eq :foo
172+
end
173+
174+
it 'returns the old value if the validator returns false' do
175+
validator = ->(value){ false }
176+
atom = Atom.new(42, validator: validator)
177+
expect(atom.reset(:foo)).to eq 42
178+
end
179+
180+
it 'returns the old value if the validator raises an exception' do
181+
validator = ->(value){ raise StandardError }
182+
atom = Atom.new(42, validator: validator)
183+
expect(atom.reset(:foo)).to eq 42
184+
end
185+
end
148186

149-
#t = Thread.new do
150-
#atom.swap do |value|
151-
#counter.increment
152-
#latch.wait
153-
#42
154-
#end
155-
#end
187+
context :observable do
156188

157-
#atom.swap{ 100 }
158-
#latch.count_down
159-
#t.join(1)
189+
subject { Atom.new(0) }
190+
191+
def trigger_observable(observable)
192+
observable.reset(42)
193+
end
160194

161-
#expect(counter.value).to eq 2
162-
#end
195+
it_behaves_like :observable
163196
end
164197
end
165198
end

spec/concurrent/timer_task_spec.rb

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,6 @@
44
module Concurrent
55

66
describe TimerTask do
7-
#before(:each) do
8-
## suppress deprecation warnings.
9-
#allow_any_instance_of(Concurrent::TimerTask).to receive(:warn)
10-
#allow(Concurrent::TimerTask).to receive(:warn)
11-
#end
127

138
context :dereferenceable do
149

0 commit comments

Comments
 (0)