Skip to content

Commit bb4b9b0

Browse files
committed
Merge pull request #308 from ruby-concurrency/atom
Atom
2 parents d0a6d68 + 422f508 commit bb4b9b0

File tree

14 files changed

+501
-193
lines changed

14 files changed

+501
-193
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ This library contains a variety of concurrency abstractions at high and low leve
5353
### High-level, general-purpose asynchronous concurrency abstractions
5454

5555
* [Async](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Async.html): A mixin module that provides simple asynchronous behavior to any standard class/object or object.
56+
* [Atom](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Atom.html): A way to manage shared, synchronous, independent state.
5657
* [Future](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Future.html): An asynchronous operation that produces a value.
5758
* [Dataflow](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent.html#dataflow-class_method): Built on Futures, Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available.
5859
* [Promise](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Promise.html): Similar to Futures, with more features.

doc/agent.md

Lines changed: 0 additions & 82 deletions
This file was deleted.

lib/concurrent.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
require 'concurrent/struct'
1414

1515
require 'concurrent/atomic/atomic_reference'
16+
require 'concurrent/atom'
1617
require 'concurrent/async'
1718
require 'concurrent/dataflow'
1819
require 'concurrent/delay'

lib/concurrent/agent.rb

Lines changed: 82 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,74 @@
66

77
module Concurrent
88

9-
# {include:file:doc/agent.md}
9+
# `Agent`s are inspired by [Clojure's](http://clojure.org/) [agent](http://clojure.org/agents) function. An `Agent` is a single atomic value that represents an identity. The current value of the `Agent` can be requested at any time (`deref`). Each `Agent` has a work queue and operates on the global thread pool (see below). Consumers can `post` code blocks to the `Agent`. The code block (function) will receive the current value of the `Agent` as its sole parameter. The return value of the block will become the new value of the `Agent`. `Agent`s support two error handling modes: fail and continue. A good example of an `Agent` is a shared incrementing counter, such as the score in a video game.
10+
#
11+
# An `Agent` must be initialize with an initial value. This value is always accessible via the `value` (or `deref`) methods. Code blocks sent to the `Agent` will be processed in the order received. As each block is processed the current value is updated with the result from the block. This update is an atomic operation so a `deref` will never block and will always return the current value.
12+
#
13+
# When an `Agent` is created it may be given an optional `validate` block and zero or more `rescue` blocks. When a new value is calculated the value will be checked against the validator, if present. If the validator returns `true` the new value will be accepted. If it returns `false` it will be rejected. If a block raises an exception during execution the list of `rescue` blocks will be seacrhed in order until one matching the current exception is found. That `rescue` block will then be called an passed the exception object. If no matching `rescue` block is found, or none were configured, then the exception will be suppressed.
14+
#
15+
# `Agent`s also implement Ruby's [Observable](http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html). Code that observes an `Agent` will receive a callback with the new value any time the value is changed.
16+
#
17+
# @!macro copy_options
18+
#
19+
# @example Simple Example
20+
#
21+
# require 'concurrent'
22+
#
23+
# score = Concurrent::Agent.new(10)
24+
# score.value #=> 10
25+
#
26+
# score << proc{|current| current + 100 }
27+
# sleep(0.1)
28+
# score.value #=> 110
29+
#
30+
# score << proc{|current| current * 2 }
31+
# sleep(0.1)
32+
# score.value #=> 220
33+
#
34+
# score << proc{|current| current - 50 }
35+
# sleep(0.1)
36+
# score.value #=> 170
37+
#
38+
# @example With Validation and Error Handling
39+
#
40+
# score = Concurrent::Agent.new(0).validate{|value| value <= 1024 }.
41+
# rescue(NoMethodError){|ex| puts "Bam!" }.
42+
# rescue(ArgumentError){|ex| puts "Pow!" }.
43+
# rescue{|ex| puts "Boom!" }
44+
# score.value #=> 0
45+
#
46+
# score << proc{|current| current + 2048 }
47+
# sleep(0.1)
48+
# score.value #=> 0
49+
#
50+
# score << proc{|current| raise ArgumentError }
51+
# sleep(0.1)
52+
# #=> puts "Pow!"
53+
# score.value #=> 0
54+
#
55+
# score << proc{|current| current + 100 }
56+
# sleep(0.1)
57+
# score.value #=> 100
58+
#
59+
# @example With Observation
60+
#
61+
# bingo = Class.new{
62+
# def update(time, score)
63+
# puts "Bingo! [score: #{score}, time: #{time}]" if score >= 100
64+
# end
65+
# }.new
66+
#
67+
# score = Concurrent::Agent.new(0)
68+
# score.add_observer(bingo)
69+
#
70+
# score << proc{|current| sleep(0.1); current += 30 }
71+
# score << proc{|current| sleep(0.1); current += 30 }
72+
# score << proc{|current| sleep(0.1); current += 30 }
73+
# score << proc{|current| sleep(0.1); current += 30 }
74+
#
75+
# sleep(1)
76+
# #=> Bingo! [score: 120, time: 2013-07-22 21:26:08 -0400]
1077
#
1178
# @!attribute [r] timeout
1279
# @return [Fixnum] the maximum number of seconds before an update is cancelled
@@ -21,18 +88,7 @@ class Agent
2188
#
2289
# @param [Object] initial the initial value
2390
#
24-
# @!macro [attach] executor_and_deref_options
25-
#
26-
# @param [Hash] opts the options used to define the behavior at update and deref
27-
# and to specify the executor on which to perform actions
28-
# @option opts [Executor] :executor when set use the given `Executor` instance.
29-
# Three special values are also supported: `:task` returns the global task pool,
30-
# `:operation` returns the global operation pool, and `:immediate` returns a new
31-
# `ImmediateExecutor` object.
32-
# @option opts [Boolean] :dup_on_deref (false) call `#dup` before returning the data
33-
# @option opts [Boolean] :freeze_on_deref (false) call `#freeze` before returning the data
34-
# @option opts [Proc] :copy_on_deref (nil) call the given `Proc` passing
35-
# the internal value and returning the value returned from the proc
91+
# @!macro executor_and_deref_options
3692
def initialize(initial, opts = {})
3793
@value = initial
3894
@rescuers = []
@@ -128,19 +184,19 @@ def post(&block)
128184
def post_off(timeout = nil, &block)
129185
warn '[DEPRECATED] post_off with timeout options is deprecated and will be removed'
130186
task = if timeout
131-
lambda do |value|
132-
future = Future.execute do
133-
block.call(value)
134-
end
135-
if future.wait(timeout)
136-
future.value!
137-
else
138-
raise Concurrent::TimeoutError
139-
end
140-
end
141-
else
142-
block
143-
end
187+
lambda do |value|
188+
future = Future.execute do
189+
block.call(value)
190+
end
191+
if future.wait(timeout)
192+
future.value!
193+
else
194+
raise Concurrent::TimeoutError
195+
end
196+
end
197+
else
198+
block
199+
end
144200
post_on(@io_executor, &task)
145201
end
146202

lib/concurrent/atom.rb

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
require 'concurrent/dereferenceable'
2+
require 'concurrent/atomic/atomic_reference'
3+
require 'concurrent/synchronization/object'
4+
5+
module Concurrent
6+
7+
# Atoms provide a way to manage shared, synchronous, independent state.
8+
#
9+
# An atom is initialized with an initial value and an optional validation
10+
# proc. At any time the value of the atom can be synchronously and safely
11+
# changed. If a validator is given at construction then any new value
12+
# will be checked against the validator and will be rejected if the
13+
# validator returns false or raises an exception.
14+
#
15+
# There are two ways to change the value of an atom: {#compare_and_set} and
16+
# {#swap}. The former will set the new value if and only if it validates and
17+
# the current value matches the new value. The latter will atomically set the
18+
# new value to the result of running the given block if and only if that
19+
# value validates.
20+
#
21+
# @!macro copy_options
22+
#
23+
# @see http://clojure.org/atoms Clojure Atoms
24+
class Atom < Synchronization::Object
25+
include Dereferenceable
26+
27+
# @!macro [attach] atom_initialize
28+
#
29+
# Create a new atom with the given initial value.
30+
#
31+
# @param [Object] value The initial value
32+
# @param [Hash] opts The options used to configure the atom
33+
# @option opts [Proc] :validator (nil) Optional proc used to validate new
34+
# values. It must accept one and only one argument which will be the
35+
# intended new value. The validator will return true if the new value
36+
# is acceptable else return false (preferrably) or raise an exception.
37+
#
38+
# @!macro deref_options
39+
#
40+
# @raise [ArgumentError] if the validator is not a `Proc` (when given)
41+
def initialize(value, opts = {})
42+
super()
43+
synchronize{ ns_initialize(value, opts) }
44+
end
45+
46+
# The current value of the atom.
47+
#
48+
# @return [Object] The current value.
49+
def value
50+
apply_deref_options(@value.value)
51+
end
52+
alias_method :deref, :value
53+
54+
# Atomically swaps the value of atom using the given block. The current
55+
# value will be passed to the block, as will any arguments passed as
56+
# arguments to the function. The new value will be validated against the
57+
# (optional) validator proc given at construction. If validation fails the
58+
# value will not be changed.
59+
#
60+
# Internally, {#swap} reads the current value, applies the block to it, and
61+
# attempts to compare-and-set it in. Since another thread may have changed
62+
# the value in the intervening time, it may have to retry, and does so in a
63+
# spin loop. The net effect is that the value will always be the result of
64+
# the application of the supplied block to a current value, atomically.
65+
# However, because the block might be called multiple times, it must be free
66+
# of side effects.
67+
#
68+
# @note The given block may be called multiple times, and thus should be free
69+
# of side effects.
70+
#
71+
# @param [Object] args Zero or more arguments passed to the block.
72+
#
73+
# @yield [value, args] Calculates a new value for the atom based on the
74+
# current value and any supplied agruments.
75+
# @yieldparam value [Object] The current value of the atom.
76+
# @yieldparam args [Object] All arguments passed to the function, in order.
77+
# @yieldreturn [Object] The intended new value of the atom.
78+
#
79+
# @return [Object] The final value of the atom after all operations and
80+
# validations are complete.
81+
#
82+
# @raise [ArgumentError] When no block is given.
83+
def swap(*args)
84+
raise ArgumentError.new('no block given') unless block_given?
85+
86+
begin
87+
loop do
88+
old_value = @value.value
89+
new_value = yield(old_value, *args)
90+
return old_value unless @validator.call(new_value)
91+
return new_value if compare_and_set!(old_value, new_value)
92+
end
93+
rescue
94+
return @value.value
95+
end
96+
end
97+
98+
# @!macro [attach] atom_compare_and_set
99+
# Atomically sets the value of atom to the new value if and only if the
100+
# current value of the atom is identical to the old value and the new
101+
# value successfully validates against the (optional) validator given
102+
# at construction.
103+
#
104+
# @param [Object] old_value The expected current value.
105+
# @param [Object] new_value The intended new value.
106+
#
107+
# @return [Boolean] True if the value is changed else false.
108+
def compare_and_set(old_value, new_value)
109+
compare_and_set!(old_value, new_value)
110+
rescue
111+
false
112+
end
113+
114+
private
115+
116+
# @!macro atom_initialize
117+
# @!visibility private
118+
def ns_initialize(value, opts)
119+
@validator = opts.fetch(:validator, ->(v){ true })
120+
raise ArgumentError.new('validator must be a proc') unless @validator.is_a? Proc
121+
@value = Concurrent::AtomicReference.new(value)
122+
ns_set_deref_options(opts)
123+
end
124+
125+
# @!macro atom_compare_and_set
126+
# @raise [Exception] if the validator proc raises an exception
127+
# @!visibility private
128+
def compare_and_set!(old_value, new_value)
129+
if @validator.call(new_value) # may raise exception
130+
@value.compare_and_set(old_value, new_value)
131+
else
132+
false
133+
end
134+
end
135+
end
136+
end

lib/concurrent/delay.rb

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ module Concurrent
2929
# `Delay` includes the `Concurrent::Dereferenceable` mixin to support thread
3030
# safety of the reference returned by `#value`.
3131
#
32+
# @!macro copy_options
33+
#
3234
# @!macro [attach] delay_note_regarding_blocking
3335
# @note The default behavior of `Delay` is to block indefinitely when
3436
# calling either `value` or `wait`, executing the delayed operation on
@@ -51,7 +53,15 @@ class Delay < Synchronization::Object
5153

5254
# Create a new `Delay` in the `:pending` state.
5355
#
54-
# @!macro executor_and_deref_options
56+
# @!macro [attach] executor_and_deref_options
57+
#
58+
# @param [Hash] opts the options used to define the behavior at update and deref
59+
# and to specify the executor on which to perform actions
60+
# @option opts [Executor] :executor when set use the given `Executor` instance.
61+
# Three special values are also supported: `:task` returns the global task pool,
62+
# `:operation` returns the global operation pool, and `:immediate` returns a new
63+
# `ImmediateExecutor` object.
64+
# @!macro deref_options
5565
#
5666
# @yield the delayed operation to perform
5767
#

0 commit comments

Comments
 (0)