Skip to content

Commit ef88134

Browse files
committed
Observer sets now extend Synchronization::Object
1 parent 58a2f53 commit ef88134

File tree

2 files changed

+48
-67
lines changed

2 files changed

+48
-67
lines changed

lib/concurrent/atomic/copy_on_notify_observer_set.rb

Lines changed: 25 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
1+
require 'concurrent/synchronization'
2+
13
module Concurrent
24

35
# A thread safe observer set implemented using copy-on-read approach:
46
# observers are added and removed from a thread safe collection; every time
57
# a notification is required the internal data structure is copied to
68
# prevent concurrency issues
7-
class CopyOnNotifyObserverSet
8-
9-
def initialize
10-
@mutex = Mutex.new
11-
@observers = {}
12-
end
9+
class CopyOnNotifyObserverSet < Synchronization::Object
1310

1411
# Adds an observer to this set. If a block is passed, the observer will be
1512
# created by this method and no other params should be passed
@@ -30,41 +27,33 @@ def add_observer(observer=nil, func=:update, &block)
3027
func = :call
3128
end
3229

33-
begin
34-
@mutex.lock
30+
synchronize do
3531
@observers[observer] = func
3632
observer
37-
ensure
38-
@mutex.unlock
3933
end
4034
end
4135

4236
# @param [Object] observer the observer to remove
4337
# @return [Object] the deleted observer
4438
def delete_observer(observer)
45-
@mutex.lock
46-
@observers.delete(observer)
47-
observer
48-
ensure
49-
@mutex.unlock
39+
synchronize do
40+
@observers.delete(observer)
41+
observer
42+
end
5043
end
5144

5245
# Deletes all observers
5346
# @return [CopyOnWriteObserverSet] self
5447
def delete_observers
55-
@mutex.lock
56-
@observers.clear
57-
self
58-
ensure
59-
@mutex.unlock
48+
synchronize do
49+
@observers.clear
50+
self
51+
end
6052
end
6153

6254
# @return [Integer] the observers count
6355
def count_observers
64-
@mutex.lock
65-
@observers.count
66-
ensure
67-
@mutex.unlock
56+
synchronize { @observers.count }
6857
end
6958

7059
# Notifies all registered observers with optional args
@@ -86,23 +75,24 @@ def notify_and_delete_observers(*args, &block)
8675
self
8776
end
8877

78+
protected
79+
80+
def ns_initialize
81+
@observers = {}
82+
end
83+
8984
private
9085

9186
def duplicate_and_clear_observers
92-
@mutex.lock
93-
observers = @observers.dup
94-
@observers.clear
95-
observers
96-
ensure
97-
@mutex.unlock
87+
synchronize do
88+
observers = @observers.dup
89+
@observers.clear
90+
observers
91+
end
9892
end
9993

10094
def duplicate_observers
101-
@mutex.lock
102-
observers = @observers.dup
103-
observers
104-
ensure
105-
@mutex.unlock
95+
synchronize { observers = @observers.dup }
10696
end
10797

10898
def notify_to(observers, *args)

lib/concurrent/atomic/copy_on_write_observer_set.rb

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
1+
require 'concurrent/synchronization'
2+
13
module Concurrent
24

35
# A thread safe observer set implemented using copy-on-write approach:
46
# every time an observer is added or removed the whole internal data structure is
57
# duplicated and replaced with a new one.
6-
class CopyOnWriteObserverSet
7-
8-
def initialize
9-
@mutex = Mutex.new
10-
@observers = {}
11-
end
8+
class CopyOnWriteObserverSet < Synchronization::Object
129

1310
# Adds an observer to this set
1411
# If a block is passed, the observer will be created by this method and no
@@ -29,27 +26,23 @@ def add_observer(observer=nil, func=:update, &block)
2926
func = :call
3027
end
3128

32-
begin
33-
@mutex.lock
29+
synchronize do
3430
new_observers = @observers.dup
3531
new_observers[observer] = func
3632
@observers = new_observers
3733
observer
38-
ensure
39-
@mutex.unlock
4034
end
4135
end
4236

4337
# @param [Object] observer the observer to remove
4438
# @return [Object] the deleted observer
4539
def delete_observer(observer)
46-
@mutex.lock
47-
new_observers = @observers.dup
48-
new_observers.delete(observer)
49-
@observers = new_observers
50-
observer
51-
ensure
52-
@mutex.unlock
40+
synchronize do
41+
new_observers = @observers.dup
42+
new_observers.delete(observer)
43+
@observers = new_observers
44+
observer
45+
end
5346
end
5447

5548
# Deletes all observers
@@ -59,7 +52,6 @@ def delete_observers
5952
self
6053
end
6154

62-
6355
# @return [Integer] the observers count
6456
def count_observers
6557
observers.count
@@ -83,6 +75,12 @@ def notify_and_delete_observers(*args, &block)
8375
self
8476
end
8577

78+
protected
79+
80+
def ns_initialize
81+
@observers = {}
82+
end
83+
8684
private
8785

8886
def notify_to(observers, *args)
@@ -94,26 +92,19 @@ def notify_to(observers, *args)
9492
end
9593

9694
def observers
97-
@mutex.lock
98-
@observers
99-
ensure
100-
@mutex.unlock
95+
synchronize { @observers }
10196
end
10297

10398
def observers=(new_set)
104-
@mutex.lock
105-
@observers = new_set
106-
ensure
107-
@mutex.unlock
99+
synchronize { @observers = new_set }
108100
end
109101

110102
def clear_observers_and_return_old
111-
@mutex.lock
112-
old_observers = @observers
113-
@observers = {}
114-
old_observers
115-
ensure
116-
@mutex.unlock
103+
synchronize do
104+
old_observers = @observers
105+
@observers = {}
106+
old_observers
107+
end
117108
end
118109
end
119110
end

0 commit comments

Comments
 (0)