Skip to content

Commit 9fcc8a4

Browse files
committed
New observers work better with Dereferenceable.
* CopyOnNotifyObserverSet#notify_observers now accepts a block * CopyOnNotifyObserverSet#notify_and_delete_observers now accepts a block * CopyOnWriteObserverSet#notify_observers now accepts a block * CopyOnWriteObserverSet#notify_and_delete_observers now accepts a block * Agent observer notification uses blocks to better support Dereferenceable * IVar observer notification uses blocks to better support Dereferenceable * ScheduledTask observer notification uses blocks to better support Dereferenceable
1 parent 70411fe commit 9fcc8a4

File tree

6 files changed

+85
-40
lines changed

6 files changed

+85
-40
lines changed

lib/concurrent/agent.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ def work(&handler) # :nodoc:
170170
should_notify = true
171171
end
172172
end
173-
@observers.notify_observers(Time.now, self.value) if should_notify
173+
time = Time.now
174+
@observers.notify_observers{ [time, self.value] } if should_notify
174175
rescue Exception => ex
175176
try_rescue(ex)
176177
end

lib/concurrent/copy_on_notify_observer_set.rb

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ def count_observers
4141
# Notifies all registered observers with optional args
4242
# @param [Object] args arguments to be passed to each observer
4343
# @return [CopyOnWriteObserverSet] self
44-
def notify_observers(*args)
44+
def notify_observers(*args, &block)
4545
observers = @mutex.synchronize { @observers.dup }
46-
notify_to(observers, *args)
46+
notify_to(observers, *args, &block)
4747

4848
self
4949
end
@@ -52,28 +52,32 @@ def notify_observers(*args)
5252
#
5353
# @param [Object] args arguments to be passed to each observer
5454
# @return [CopyOnWriteObserverSet] self
55-
def notify_and_delete_observers(*args)
55+
def notify_and_delete_observers(*args, &block)
5656
observers = duplicate_and_clear_observers
57-
notify_to(observers, *args)
57+
notify_to(observers, *args, &block)
5858

5959
self
6060
end
6161

6262
private
6363

64-
def duplicate_and_clear_observers
65-
@mutex.synchronize do
66-
observers = @observers.dup
67-
@observers.clear
68-
observers
69-
end
64+
def duplicate_and_clear_observers
65+
@mutex.synchronize do
66+
observers = @observers.dup
67+
@observers.clear
68+
observers
7069
end
70+
end
7171

72-
def notify_to(observers, *args)
73-
observers.each do |observer, function|
74-
observer.send function, *args
72+
def notify_to(observers, *args)
73+
raise ArgumentError.new('cannot give arguments and a block') if block_given? && ! args.empty?
74+
observers.each do |observer, function|
75+
if block_given?
76+
observer.send(function, *[yield].flatten)
77+
else
78+
observer.send(function, *args)
7579
end
7680
end
77-
81+
end
7882
end
79-
end
83+
end

lib/concurrent/copy_on_write_observer_set.rb

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -50,44 +50,48 @@ def count_observers
5050
# Notifies all registered observers with optional args
5151
# @param [Object] args arguments to be passed to each observer
5252
# @return [CopyOnWriteObserverSet] self
53-
def notify_observers(*args)
54-
notify_to(observers, *args)
53+
def notify_observers(*args, &block)
54+
notify_to(observers, *args, &block)
5555
self
5656
end
5757

5858
# Notifies all registered observers with optional args and deletes them.
5959
#
6060
# @param [Object] args arguments to be passed to each observer
6161
# @return [CopyOnWriteObserverSet] self
62-
def notify_and_delete_observers(*args)
62+
def notify_and_delete_observers(*args, &block)
6363
old = clear_observers_and_return_old
64-
notify_to(old, *args)
64+
notify_to(old, *args, &block)
6565
self
6666
end
6767

6868
private
6969

70-
def notify_to(observers, *args)
71-
observers.each do |observer, function|
72-
observer.send function, *args
70+
def notify_to(observers, *args)
71+
raise ArgumentError.new('cannot give arguments and a block') if block_given? && ! args.empty?
72+
observers.each do |observer, function|
73+
if block_given?
74+
observer.send(function, *[yield].flatten)
75+
else
76+
observer.send(function, *args)
7377
end
7478
end
79+
end
7580

76-
def observers
77-
@mutex.synchronize { @observers }
78-
end
81+
def observers
82+
@mutex.synchronize { @observers }
83+
end
7984

80-
def observers=(new_set)
81-
@mutex.synchronize { @observers = new_set}
82-
end
85+
def observers=(new_set)
86+
@mutex.synchronize { @observers = new_set}
87+
end
8388

84-
def clear_observers_and_return_old
85-
@mutex.synchronize do
86-
old_observers = @observers
87-
@observers = {}
88-
old_observers
89-
end
89+
def clear_observers_and_return_old
90+
@mutex.synchronize do
91+
old_observers = @observers
92+
@observers = {}
93+
old_observers
9094
end
91-
95+
end
9296
end
93-
end
97+
end

lib/concurrent/ivar.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ def complete(success, value, reason)
7171
event.set
7272
end
7373

74-
@observers.notify_and_delete_observers(Time.now, value, reason)
74+
time = Time.now
75+
@observers.notify_and_delete_observers{ [time, self.value, reason] }
7576
end
76-
7777
end
7878
end

lib/concurrent/scheduled_task.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ def work
7575
event.set
7676
end
7777

78-
@observers.notify_and_delete_observers(Time.now, self.value, reason)
78+
time = Time.now
79+
@observers.notify_and_delete_observers{ [time, self.value, reason] }
7980
end
8081

8182
end

spec/concurrent/observer_set_shared.rb

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,41 @@
8484
observer_set.notify_observers(4, 'a')
8585
end
8686
end
87+
88+
context 'with a block' do
89+
90+
before(:each) do
91+
observer.stub(:update).with(any_args)
92+
another_observer.stub(:update).with(any_args)
93+
end
94+
95+
it 'calls the block once for every observer' do
96+
97+
counter = double('block call counter')
98+
expect(counter).to receive(:called).with(no_args).exactly(2).times
99+
100+
observer_set.add_observer(observer)
101+
observer_set.add_observer(another_observer)
102+
103+
observer_set.notify_observers{ counter.called }
104+
end
105+
106+
it 'passes the block return value to the update method' do
107+
108+
expect(observer).to receive(:update).with(1, 2, 3, 4)
109+
observer_set.add_observer(observer)
110+
observer_set.notify_observers{ [1, 2, 3, 4] }
111+
end
112+
113+
it 'raises an exception if given both arguments and a block' do
114+
115+
observer_set.add_observer(observer)
116+
117+
expect {
118+
observer_set.notify_observers(1, 2, 3, 4){ nil }
119+
}.to raise_error(ArgumentError)
120+
end
121+
end
87122
end
88123

89124
context '#count_observers' do

0 commit comments

Comments
 (0)