Skip to content

Commit 988ed27

Browse files
committed
Improvements to TimerSet#cancel.
1 parent 9498bfc commit 988ed27

File tree

4 files changed

+37
-20
lines changed

4 files changed

+37
-20
lines changed

.yardopts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
--protected
21
--no-private
32
--embed-mixins
43
--output-dir ./yardoc

lib/concurrent/collection/priority_queue.rb

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ module Concurrent
2121
# When running under JRuby the class `PriorityQueue` extends `JavaPriorityQueue`.
2222
# When running under all other interpreters it extends `MutexPriorityQueue`.
2323
#
24-
# @note This implementation is *not* thread safe and performs no blocking.
24+
# @note This implementation is *not* thread safe.
2525
#
2626
# @see http://en.wikipedia.org/wiki/Priority_queue
2727
# @see http://ruby-doc.org/stdlib-2.0.0/libdoc/thread/rdoc/Queue.html
@@ -305,6 +305,16 @@ def self.from_list(list, opts = {})
305305
# @!macro priority_queue
306306
class PriorityQueue < PriorityQueueImplementation
307307

308+
alias_method :has_priority?, :include?
309+
310+
alias_method :size, :length
311+
312+
alias_method :deq, :pop
313+
alias_method :shift, :pop
314+
315+
alias_method :<<, :push
316+
alias_method :enq, :push
317+
308318
# @!method initialize(opts = {})
309319
# @!macro priority_queue_method_initialize
310320

@@ -334,6 +344,5 @@ class PriorityQueue < PriorityQueueImplementation
334344

335345
# @!method self.from_list(list, opts = {})
336346
# @!macro priority_queue_method_from_list
337-
338347
end
339348
end

lib/concurrent/executor/timer_set.rb

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,14 @@ module Concurrent
1616
# @!macro monotonic_clock_warning
1717
class TimerSet < RubyExecutorService
1818

19-
# A class for encapsulating a task and its intended execution time.
20-
# It facilitates proper prioritization by overriding the comparison
21-
# (spaceship) operator as a comparison of the intended execution
22-
# times.
23-
#
24-
# @!visibility private
19+
# An `IVar` representing a tasked queued for execution in a `TimerSet`.
2520
class Task < Concurrent::IVar
2621
include Comparable
2722

28-
# @!visibility private
29-
def initialize(time, args, task)
23+
def initialize(parent, time, args, task)
3024
super()
3125
synchronize do
26+
@parent = parent
3227
@time = time
3328
@args = args
3429
@task = task
@@ -40,20 +35,20 @@ def time
4035
synchronize { @time }
4136
end
4237

43-
# @!visibility private
4438
def <=>(other)
4539
self.time <=> other.time
4640
end
4741

48-
# @!visibility private
4942
def cancelled?
5043
state == :cancelled
5144
end
5245

53-
# @!visibility private
5446
def cancel
5547
if compare_and_set_state(:cancelled, :pending)
5648
complete(false, nil, CancelledOperationError.new)
49+
# To avoid deadlocks this call must occur outside of #synchronize
50+
# Changing the state above should prevent redundant calls
51+
@parent.send(:remove_task, self)
5752
true
5853
else
5954
false
@@ -64,8 +59,9 @@ def cancel
6459
def execute
6560
safe_execute(@task, @args)
6661
end
62+
63+
protected :set, :try_set
6764
end
68-
private_constant :Task
6965

7066
# Create a new set of timed tasks.
7167
#
@@ -88,7 +84,8 @@ def initialize(opts = {})
8884
#
8985
# @yield the task to be performed
9086
#
91-
# @return [Boolean] true if the message is post, false after shutdown
87+
# @return [Concurrent::TimerSet::Task, false] IVar representing the task if the post
88+
# is successful; false after shutdown
9289
#
9390
# @raise [ArgumentError] if the intended execution time is not in the future
9491
# @raise [ArgumentError] if no block is given
@@ -102,7 +99,7 @@ def post(delay, *args, &task)
10299
return false unless running?
103100

104101
time = Concurrent.monotonic_time + delay
105-
task = Task.new(time, args, task)
102+
task = Task.new(self, time, args, task)
106103

107104
if (delay) <= 0.01
108105
@task_executor.post{ task.execute }
@@ -152,6 +149,18 @@ def self.calculate_delay!(delay)
152149

153150
protected
154151

152+
# Remove the given task from the queue.
153+
#
154+
# @note This is intended as a callback method from Task only.
155+
# It is not intended to be used directly. Cancel a task by
156+
# using the `Task#cancel` method.
157+
#
158+
# @!visibility private
159+
def remove_task(task)
160+
synchronize{ @queue.delete(task) }
161+
end
162+
163+
# @!visibility private
155164
def ns_initialize(opts)
156165
@queue = PriorityQueue.new(order: :min)
157166
@task_executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
@@ -204,9 +213,9 @@ def process_tasks
204213

205214
private
206215

216+
# @!visibility private
207217
def <<(task)
208-
post(0.0, &task)
209-
self
218+
raise NotImplementedError.new
210219
end
211220
end
212221
end

lib/concurrent/utility/timer.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ module Concurrent
99
#
1010
# @yield the task to execute
1111
#
12-
# @return [Boolean] true
12+
# @return [Concurrent::TimerSet::Task] IVar representing the task
1313
#
1414
# @!macro monotonic_clock_warning
1515
def timer(seconds, *args, &block)

0 commit comments

Comments
 (0)