1
- require 'thread'
2
1
require 'concurrent/collection/copy_on_write_observer_set'
3
2
require 'concurrent/concern/dereferenceable'
4
3
require 'concurrent/concern/observable'
5
4
require 'concurrent/concern/logging'
6
5
require 'concurrent/executor/executor'
6
+ require 'concurrent/synchronization'
7
7
8
8
module Concurrent
9
9
@@ -80,7 +80,7 @@ module Concurrent
80
80
# @return [Fixnum] the maximum number of seconds before an update is cancelled
81
81
#
82
82
# @!macro edge_warning
83
- class Agent
83
+ class Agent < Synchronization :: Object
84
84
include Concern ::Dereferenceable
85
85
include Concern ::Observable
86
86
include Concern ::Logging
@@ -93,15 +93,8 @@ class Agent
93
93
#
94
94
# @!macro executor_and_deref_options
95
95
def initialize ( initial , opts = { } )
96
- @value = initial
97
- @rescuers = [ ]
98
- @validator = Proc . new { |result | true }
99
- self . observers = Collection ::CopyOnWriteObserverSet . new
100
- @serialized_execution = SerializedExecution . new
101
- @io_executor = Executor . executor_from_options ( opts ) || Concurrent . global_io_executor
102
- @fast_executor = Executor . executor_from_options ( opts ) || Concurrent . global_fast_executor
103
- init_mutex
104
- set_deref_options ( opts )
96
+ super ( )
97
+ synchronize { ns_initialize ( initial , opts ) }
105
98
end
106
99
107
100
# Specifies a block fast to be performed when an update fast raises
@@ -127,17 +120,15 @@ def initialize(initial, opts = {})
127
120
# #=> puts "Pow!"
128
121
def rescue ( clazz = StandardError , &block )
129
122
unless block . nil?
130
- mutex . synchronize do
131
- @rescuers << Rescuer . new ( clazz , block )
132
- end
123
+ synchronize { @rescuers << Rescuer . new ( clazz , block ) }
133
124
end
134
125
self
135
126
end
136
127
137
128
alias_method :catch , :rescue
138
129
alias_method :on_error , :rescue
139
130
140
- # A block fast to be performed after every update to validate if the new
131
+ # A block task to be performed after every update to validate if the new
141
132
# value is valid. If the new value is not valid then the current value is not
142
133
# updated. If no validator is provided then all updates are considered valid.
143
134
#
@@ -148,12 +139,7 @@ def rescue(clazz = StandardError, &block)
148
139
def validate ( &block )
149
140
150
141
unless block . nil?
151
- begin
152
- mutex . lock
153
- @validator = block
154
- ensure
155
- mutex . unlock
156
- end
142
+ synchronize { @validator = block }
157
143
end
158
144
self
159
145
end
@@ -208,6 +194,20 @@ def await(timeout = nil)
208
194
done . wait timeout
209
195
end
210
196
197
+ protected
198
+
199
+ def ns_initialize ( initial , opts )
200
+ init_mutex ( self )
201
+ @value = initial
202
+ @rescuers = [ ]
203
+ @validator = Proc . new { |result | true }
204
+ self . observers = Collection ::CopyOnWriteObserverSet . new
205
+ @serialized_execution = SerializedExecution . new
206
+ @io_executor = Executor . executor_from_options ( opts ) || Concurrent . global_io_executor
207
+ @fast_executor = Executor . executor_from_options ( opts ) || Concurrent . global_fast_executor
208
+ set_deref_options ( opts )
209
+ end
210
+
211
211
private
212
212
213
213
def post_on ( executor , &block )
@@ -221,7 +221,7 @@ def post_on(executor, &block)
221
221
222
222
# @!visibility private
223
223
def try_rescue ( ex ) # :nodoc:
224
- rescuer = mutex . synchronize do
224
+ rescuer = synchronize do
225
225
@rescuers . find { |r | ex . is_a? ( r . clazz ) }
226
226
end
227
227
rescuer . block . call ( ex ) if rescuer
@@ -232,7 +232,7 @@ def try_rescue(ex) # :nodoc:
232
232
233
233
# @!visibility private
234
234
def work ( &handler ) # :nodoc:
235
- validator , value = mutex . synchronize { [ @validator , @value ] }
235
+ validator , value = synchronize { [ @validator , @value ] }
236
236
237
237
begin
238
238
result = handler . call ( value )
@@ -241,14 +241,11 @@ def work(&handler) # :nodoc:
241
241
exception = ex
242
242
end
243
243
244
- begin
245
- mutex . lock
246
- should_notify = if !exception && valid
247
- @value = result
248
- true
249
- end
250
- ensure
251
- mutex . unlock
244
+ should_notify = synchronize do
245
+ if !exception && valid
246
+ @value = result
247
+ true
248
+ end
252
249
end
253
250
254
251
if should_notify
0 commit comments