Skip to content

Commit 5ec1c54

Browse files
committed
Optimized CopyOnNotifyObserverSet and CopyOnWriteObserverSet
1 parent fb0bd91 commit 5ec1c54

File tree

2 files changed

+62
-29
lines changed

2 files changed

+62
-29
lines changed

lib/concurrent/atomic/copy_on_notify_observer_set.rb

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,34 +16,49 @@ def initialize
1616
# @param [Symbol] func the function to call on the observer during notification. Default is :update
1717
# @return [Symbol] the added function
1818
def add_observer(observer, func=:update)
19-
@mutex.synchronize { @observers[observer] = func }
19+
@mutex.lock
20+
@observers[observer] = func
21+
@mutex.unlock
22+
23+
func
2024
end
25+
2126
alias_method :add_watch, :add_observer
2227

2328
# @param [Object] observer the observer to remove
2429
# @return [Object] the deleted observer
2530
def delete_observer(observer)
26-
@mutex.synchronize { @observers.delete(observer) }
31+
@mutex.lock
32+
@observers.delete(observer)
33+
@mutex.unlock
34+
2735
observer
2836
end
2937

3038
# Deletes all observers
3139
# @return [CopyOnWriteObserverSet] self
3240
def delete_observers
33-
@mutex.synchronize { @observers.clear }
41+
@mutex.lock
42+
@observers.clear
43+
@mutex.unlock
44+
3445
self
3546
end
3647

3748
# @return [Integer] the observers count
3849
def count_observers
39-
@mutex.synchronize { @observers.count }
50+
@mutex.lock
51+
result = @observers.count
52+
@mutex.unlock
53+
54+
result
4055
end
4156

4257
# Notifies all registered observers with optional args
4358
# @param [Object] args arguments to be passed to each observer
4459
# @return [CopyOnWriteObserverSet] self
4560
def notify_observers(*args, &block)
46-
observers = @mutex.synchronize { @observers.dup }
61+
observers = duplicate_observers
4762
notify_to(observers, *args, &block)
4863

4964
self
@@ -63,15 +78,24 @@ def notify_and_delete_observers(*args, &block)
6378
private
6479

6580
def duplicate_and_clear_observers
66-
@mutex.synchronize do
67-
observers = @observers.dup
68-
@observers.clear
69-
observers
70-
end
81+
@mutex.lock
82+
observers = @observers.dup
83+
@observers.clear
84+
@mutex.unlock
85+
86+
observers
87+
end
88+
89+
def duplicate_observers
90+
@mutex.lock
91+
observers = @observers.dup
92+
@mutex.unlock
93+
94+
observers
7195
end
7296

7397
def notify_to(observers, *args)
74-
raise ArgumentError.new('cannot give arguments and a block') if block_given? && ! args.empty?
98+
raise ArgumentError.new('cannot give arguments and a block') if block_given? && !args.empty?
7599
observers.each do |observer, function|
76100
args = yield if block_given?
77101
observer.send(function, *args)

lib/concurrent/atomic/copy_on_write_observer_set.rb

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,24 @@ def initialize
1515
# @param [Symbol] func the function to call on the observer during notification. Default is :update
1616
# @return [Symbol] the added function
1717
def add_observer(observer, func=:update)
18-
@mutex.synchronize do
19-
new_observers = @observers.dup
20-
new_observers[observer] = func
21-
@observers = new_observers
22-
end
18+
@mutex.lock
19+
new_observers = @observers.dup
20+
new_observers[observer] = func
21+
@observers = new_observers
22+
@mutex.unlock
23+
2324
func
2425
end
2526

2627
# @param [Object] observer the observer to remove
2728
# @return [Object] the deleted observer
2829
def delete_observer(observer)
29-
@mutex.synchronize do
30-
new_observers = @observers.dup
31-
new_observers.delete(observer)
32-
@observers = new_observers
33-
end
30+
@mutex.lock
31+
new_observers = @observers.dup
32+
new_observers.delete(observer)
33+
@observers = new_observers
34+
@mutex.unlock
35+
3436
observer
3537
end
3638

@@ -68,27 +70,34 @@ def notify_and_delete_observers(*args, &block)
6870
private
6971

7072
def notify_to(observers, *args)
71-
raise ArgumentError.new('cannot give arguments and a block') if block_given? && ! args.empty?
73+
raise ArgumentError.new('cannot give arguments and a block') if block_given? && !args.empty?
7274
observers.each do |observer, function|
7375
args = yield if block_given?
7476
observer.send(function, *args)
7577
end
7678
end
7779

7880
def observers
79-
@mutex.synchronize { @observers }
81+
@mutex.lock
82+
o = @observers
83+
@mutex.unlock
84+
85+
o
8086
end
8187

8288
def observers=(new_set)
83-
@mutex.synchronize { @observers = new_set}
89+
@mutex.lock
90+
@observers = new_set
91+
@mutex.unlock
8492
end
8593

8694
def clear_observers_and_return_old
87-
@mutex.synchronize do
88-
old_observers = @observers
89-
@observers = {}
90-
old_observers
91-
end
95+
@mutex.lock
96+
old_observers = @observers
97+
@observers = {}
98+
@mutex.unlock
99+
100+
old_observers
92101
end
93102
end
94103
end

0 commit comments

Comments
 (0)