Skip to content

Commit 1e6f16e

Browse files
committed
Add the Scalient::Concurrent::CollectionRetrofit#cancel method so that takers can receive cancellation errors from adders
1 parent f5e6429 commit 1e6f16e

File tree

2 files changed

+58
-2
lines changed

2 files changed

+58
-2
lines changed

app/concerns/scalient/concurrent/collection_retrofit.rb

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ module CollectionRetrofit
2424
if !include?(::MonitorMixin)
2525
send(:include, ::MonitorMixin)
2626
end
27+
28+
attr_accessor :_cancellation_error_class
2729
end
2830

2931
# A monitor-owned condition variable composed with a (sortable) value.
@@ -69,7 +71,10 @@ def synchronized_take(n_items_argument = nil, timeout = nil, policy: :partial_on
6971
end
7072

7173
synchronize do
72-
if size >= n_items
74+
if (cancellation_error_class = _cancellation_error_class)
75+
# Canceled? Bail.
76+
raise cancellation_error_class
77+
elsif size >= n_items
7378
# If there happen to be `n_items`, just take and return.
7479
return block.call(n_items_argument ? n_items : nil)
7580
elsif policy == :partial && size > 0
@@ -84,7 +89,13 @@ def synchronized_take(n_items_argument = nil, timeout = nil, policy: :partial_on
8489
loop do
8590
vcv.wait(timeout)
8691

87-
if size >= n_items
92+
if (cancellation_error_class = _cancellation_error_class)
93+
# Deregister the taker because the operation was cancelled.
94+
taker_priority_queue.delete(vcv)
95+
96+
# Bail after doing the above bookkeeping.
97+
raise cancellation_error_class
98+
elsif size >= n_items
8899
# Deregister the taker because we got `n_items`.
89100
taker_priority_queue.delete(vcv)
90101

@@ -139,6 +150,28 @@ def signal_takers
139150
vcv.signal
140151
end
141152
end
153+
154+
# Cancel all outstanding synchronized operations.
155+
def cancel(error_class)
156+
if !error_class.try(:<, StandardError)
157+
raise ArgumentError, "Please provide an error class"
158+
end
159+
160+
synchronize do
161+
while (vcv = taker_priority_queue.pop)
162+
vcv.signal
163+
end
164+
165+
self._cancellation_error_class = error_class
166+
end
167+
end
168+
169+
# Unset the synchronize operation cancellation status.
170+
def uncancel
171+
synchronize do
172+
self._cancellation_error_class = nil
173+
end
174+
end
142175
end
143176
end
144177
end

spec/concurrent/concurrent_array_spec.rb

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,27 @@
111111
expect(items_4).to eq([0, 1, 2, 3, 4])
112112
expect(items_5).to eq([0, 1, 2, 3, 4])
113113
end
114+
115+
it "cancels outstanding synchronized operations" do
116+
shared_queue = ConcurrentArray.new
117+
118+
caught_error = nil
119+
120+
receiver = Thread.new do
121+
begin
122+
shared_queue.s_shift
123+
rescue ::Concurrent::CancelledOperationError => e
124+
caught_error = e
125+
end
126+
end
127+
128+
# Allow some time for the thread to enter condition variable `wait`.
129+
sleep(0.1)
130+
131+
shared_queue.cancel(::Concurrent::CancelledOperationError)
132+
133+
receiver.join
134+
135+
expect(caught_error).to be_a(::Concurrent::CancelledOperationError)
136+
end
114137
end

0 commit comments

Comments
 (0)