Skip to content

Commit 1b53506

Browse files
committed
Merge pull request #320 from ruby-concurrency/internal-reorg-collection
Moved classes to the `Collection` module
2 parents 493ec53 + b88c290 commit 1b53506

21 files changed

+561
-551
lines changed

lib/concurrent.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
require 'concurrent/configuration'
66

77
require 'concurrent/atomics'
8-
require 'concurrent/collections'
98
require 'concurrent/errors'
109
require 'concurrent/executors'
1110
require 'concurrent/utilities'

lib/concurrent/agent.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
require 'thread'
2+
require 'concurrent/collection/copy_on_write_observer_set'
23
require 'concurrent/concern/dereferenceable'
34
require 'concurrent/concern/observable'
45
require 'concurrent/concern/logging'
@@ -95,7 +96,7 @@ def initialize(initial, opts = {})
9596
@value = initial
9697
@rescuers = []
9798
@validator = Proc.new { |result| true }
98-
self.observers = CopyOnWriteObserverSet.new
99+
self.observers = Collection::CopyOnWriteObserverSet.new
99100
@serialized_execution = SerializedExecution.new
100101
@io_executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
101102
@fast_executor = Executor.executor_from_options(opts) || Concurrent.global_fast_executor

lib/concurrent/atomic/copy_on_notify_observer_set.rb

Lines changed: 0 additions & 111 deletions
This file was deleted.

lib/concurrent/atomic/copy_on_write_observer_set.rb

Lines changed: 0 additions & 115 deletions
This file was deleted.

lib/concurrent/atomics.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@
4141
require 'concurrent/atomic/atomic_boolean'
4242
require 'concurrent/atomic/atomic_fixnum'
4343
require 'concurrent/atomic/condition'
44-
require 'concurrent/atomic/copy_on_notify_observer_set'
45-
require 'concurrent/atomic/copy_on_write_observer_set'
4644
require 'concurrent/atomic/cyclic_barrier'
4745
require 'concurrent/atomic/count_down_latch'
4846
require 'concurrent/atomic/event'
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
require 'concurrent/synchronization'
2+
3+
module Concurrent
4+
module Collection
5+
6+
# A thread safe observer set implemented using copy-on-read approach:
7+
# observers are added and removed from a thread safe collection; every time
8+
# a notification is required the internal data structure is copied to
9+
# prevent concurrency issues
10+
class CopyOnNotifyObserverSet < Synchronization::Object
11+
12+
def initialize
13+
super()
14+
synchronize { ns_initialize }
15+
end
16+
17+
# Adds an observer to this set. If a block is passed, the observer will be
18+
# created by this method and no other params should be passed
19+
#
20+
# @param [Object] observer the observer to add
21+
# @param [Symbol] func the function to call on the observer during notification.
22+
# Default is :update
23+
# @return [Object] the added observer
24+
def add_observer(observer=nil, func=:update, &block)
25+
if observer.nil? && block.nil?
26+
raise ArgumentError, 'should pass observer as a first argument or block'
27+
elsif observer && block
28+
raise ArgumentError.new('cannot provide both an observer and a block')
29+
end
30+
31+
if block
32+
observer = block
33+
func = :call
34+
end
35+
36+
synchronize do
37+
@observers[observer] = func
38+
observer
39+
end
40+
end
41+
42+
# @param [Object] observer the observer to remove
43+
# @return [Object] the deleted observer
44+
def delete_observer(observer)
45+
synchronize do
46+
@observers.delete(observer)
47+
observer
48+
end
49+
end
50+
51+
# Deletes all observers
52+
# @return [CopyOnWriteObserverSet] self
53+
def delete_observers
54+
synchronize do
55+
@observers.clear
56+
self
57+
end
58+
end
59+
60+
# @return [Integer] the observers count
61+
def count_observers
62+
synchronize { @observers.count }
63+
end
64+
65+
# Notifies all registered observers with optional args
66+
# @param [Object] args arguments to be passed to each observer
67+
# @return [CopyOnWriteObserverSet] self
68+
def notify_observers(*args, &block)
69+
observers = duplicate_observers
70+
notify_to(observers, *args, &block)
71+
self
72+
end
73+
74+
# Notifies all registered observers with optional args and deletes them.
75+
#
76+
# @param [Object] args arguments to be passed to each observer
77+
# @return [CopyOnWriteObserverSet] self
78+
def notify_and_delete_observers(*args, &block)
79+
observers = duplicate_and_clear_observers
80+
notify_to(observers, *args, &block)
81+
self
82+
end
83+
84+
protected
85+
86+
def ns_initialize
87+
@observers = {}
88+
end
89+
90+
private
91+
92+
def duplicate_and_clear_observers
93+
synchronize do
94+
observers = @observers.dup
95+
@observers.clear
96+
observers
97+
end
98+
end
99+
100+
def duplicate_observers
101+
synchronize { observers = @observers.dup }
102+
end
103+
104+
def notify_to(observers, *args)
105+
raise ArgumentError.new('cannot give arguments and a block') if block_given? && !args.empty?
106+
observers.each do |observer, function|
107+
args = yield if block_given?
108+
observer.send(function, *args)
109+
end
110+
end
111+
end
112+
end
113+
end

0 commit comments

Comments
 (0)