@@ -19,13 +19,13 @@ module Concurrent
19
19
# and no blocking receive. The state of an Agent should be itself immutable
20
20
# and the `#value` of an Agent is always immediately available for reading by
21
21
# any thread without any messages, i.e. observation does not require
22
- # cooperation or coordination.
22
+ # cooperation or coordination.
23
23
#
24
24
# Agent action dispatches are made using the various `#send` methods. These
25
25
# methods always return immediately. At some point later, in another thread,
26
26
# the following will happen:
27
27
#
28
- # 1. The given `action` will be applied to the state of the Agent and the
28
+ # 1. The given `action` will be applied to the state of the Agent and the
29
29
# `args`, if any were supplied.
30
30
# 2. The return value of `action` will be passed to the validator lambda,
31
31
# if one has been set on the Agent.
@@ -55,7 +55,7 @@ module Concurrent
55
55
# Unlike in Clojure, `Agent` cannot participate in `Concurrent::TVar` transactions.
56
56
#
57
57
# ## Example
58
- #
58
+ #
59
59
# ```
60
60
# def next_fibonacci(set = nil)
61
61
# return [0, 1] if set.nil?
@@ -130,7 +130,7 @@ module Concurrent
130
130
# ```
131
131
#
132
132
# @!macro [new] agent_await_warning
133
- #
133
+ #
134
134
# **NOTE** Never, *under any circumstances*, call any of the "await" methods
135
135
# ({#await}, {#await_for}, {#await_for!}, and {#wait}) from within an action
136
136
# block/proc/lambda. The call will block the Agent and will always fail.
@@ -141,7 +141,7 @@ module Concurrent
141
141
#
142
142
# @see http://clojure.org/Agents Clojure Agents
143
143
# @see http://clojure.org/state Values and Change - Clojure's approach to Identity and State
144
- class Agent < Synchronization ::Object
144
+ class Agent < Synchronization ::LockableObject
145
145
include Concern ::Observable
146
146
147
147
ERROR_MODES = [ :continue , :fail ] . freeze
@@ -150,13 +150,13 @@ class Agent < Synchronization::Object
150
150
AWAIT_FLAG = Object . new
151
151
private_constant :AWAIT_FLAG
152
152
153
- AWAIT_ACTION = -> ( value , latch ) { latch . count_down ; AWAIT_FLAG }
153
+ AWAIT_ACTION = -> ( value , latch ) { latch . count_down ; AWAIT_FLAG }
154
154
private_constant :AWAIT_ACTION
155
155
156
- DEFAULT_ERROR_HANDLER = -> ( agent , error ) { nil }
156
+ DEFAULT_ERROR_HANDLER = -> ( agent , error ) { nil }
157
157
private_constant :DEFAULT_ERROR_HANDLER
158
158
159
- DEFAULT_VALIDATOR = -> ( value ) { true }
159
+ DEFAULT_VALIDATOR = -> ( value ) { true }
160
160
private_constant :DEFAULT_VALIDATOR
161
161
162
162
Job = Struct . new ( :action , :args , :executor , :caller )
@@ -226,6 +226,7 @@ def initialize(initial, opts = {})
226
226
def value
227
227
@current . value
228
228
end
229
+
229
230
alias_method :deref , :value
230
231
231
232
# When {#failed?} and {#error_mode} is `:fail`, returns the error object
@@ -236,6 +237,7 @@ def value
236
237
def error
237
238
@error . value
238
239
end
240
+
239
241
alias_method :reason , :error
240
242
241
243
# @!macro [attach] agent_send
@@ -289,6 +291,7 @@ def send!(*args, &action)
289
291
def send_off ( *args , &action )
290
292
enqueue_action_job ( action , args , Concurrent . global_io_executor )
291
293
end
294
+
292
295
alias_method :post , :send_off
293
296
294
297
# @!macro agent_send
@@ -396,6 +399,7 @@ def wait(timeout = nil)
396
399
def failed?
397
400
!@error . value . nil?
398
401
end
402
+
399
403
alias_method :stopped? , :failed?
400
404
401
405
# When an Agent is {#failed?}, changes the Agent {#value} to `new_value`
@@ -420,7 +424,7 @@ def restart(new_value, opts = {})
420
424
raise Error . new ( 'agent is not failed' ) unless failed?
421
425
raise ValidationError unless ns_validate ( new_value )
422
426
@current . value = new_value
423
- @error . value = nil
427
+ @error . value = nil
424
428
@queue . clear if clear_actions
425
429
ns_post_next_job unless @queue . empty?
426
430
end
@@ -440,7 +444,7 @@ class << self
440
444
#
441
445
# @!macro agent_await_warning
442
446
def await ( *agents )
443
- agents . each { |agent | agent . await }
447
+ agents . each { |agent | agent . await }
444
448
true
445
449
end
446
450
@@ -455,11 +459,11 @@ def await(*agents)
455
459
# @!macro agent_await_warning
456
460
def await_for ( timeout , *agents )
457
461
end_at = Concurrent . monotonic_time + timeout . to_f
458
- ok = agents . length . times do |i |
462
+ ok = agents . length . times do |i |
459
463
break false if ( delay = end_at - Concurrent . monotonic_time ) < 0
460
464
break false unless agents [ i ] . await_for ( delay )
461
465
end
462
- !! ok
466
+ !!ok
463
467
end
464
468
465
469
# Blocks the current thread until all actions dispatched thus far to all
@@ -481,7 +485,7 @@ def await_for!(timeout, *agents)
481
485
private
482
486
483
487
def ns_initialize ( initial , opts )
484
- @error_mode = opts [ :error_mode ]
488
+ @error_mode = opts [ :error_mode ]
485
489
@error_handler = opts [ :error_handler ]
486
490
487
491
if @error_mode && !ERROR_MODES . include? ( @error_mode )
@@ -491,11 +495,11 @@ def ns_initialize(initial, opts)
491
495
end
492
496
493
497
@error_handler ||= DEFAULT_ERROR_HANDLER
494
- @validator = opts . fetch ( :validator , DEFAULT_VALIDATOR )
495
- @current = Concurrent ::AtomicReference . new ( initial )
496
- @error = Concurrent ::AtomicReference . new ( nil )
497
- @caller = Concurrent ::ThreadLocalVar . new ( nil )
498
- @queue = [ ]
498
+ @validator = opts . fetch ( :validator , DEFAULT_VALIDATOR )
499
+ @current = Concurrent ::AtomicReference . new ( initial )
500
+ @error = Concurrent ::AtomicReference . new ( nil )
501
+ @caller = Concurrent ::ThreadLocalVar . new ( nil )
502
+ @queue = [ ]
499
503
500
504
self . observers = Collection ::CopyOnNotifyObserverSet . new
501
505
end
@@ -530,15 +534,15 @@ def ns_enqueue_job(job, index = nil)
530
534
end
531
535
532
536
def ns_post_next_job
533
- @queue . first . executor . post { execute_next_job }
537
+ @queue . first . executor . post { execute_next_job }
534
538
end
535
539
536
540
def execute_next_job
537
- job = synchronize { @queue . first }
541
+ job = synchronize { @queue . first }
538
542
old_value = @current . value
539
543
540
544
@caller . value = job . caller # for nested actions
541
- new_value = job . action . call ( old_value , *job . args )
545
+ new_value = job . action . call ( old_value , *job . args )
542
546
@caller . value = nil
543
547
544
548
if new_value != AWAIT_FLAG && ns_validate ( new_value )
@@ -573,7 +577,7 @@ def handle_error(error)
573
577
end
574
578
575
579
def ns_find_last_job_for_thread
576
- @queue . rindex { |job | job . caller == Thread . current . object_id }
580
+ @queue . rindex { |job | job . caller == Thread . current . object_id }
577
581
end
578
582
end
579
583
end
0 commit comments