Skip to content

Commit b421874

Browse files
committed
Merge pull request #52 from jdantonio/refactor/with-ste
Implemented TimerSet executor and updated Concurrent::timer
2 parents 49906a0 + 7ce524f commit b421874

File tree

18 files changed

+1128
-110
lines changed

18 files changed

+1128
-110
lines changed

.yardopts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
--protected --private --embed-mixins --output-dir ./doc --markup markdown
1+
--protected --no-private --embed-mixins --output-dir ./doc --markup markdown

lib/concurrent.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
require 'concurrent/atomic'
55
require 'concurrent/actor'
66
require 'concurrent/channel'
7+
require 'concurrent/collection'
78
require 'concurrent/executor'
89
require 'concurrent/utilities'
910

@@ -18,6 +19,7 @@
1819
require 'concurrent/mvar'
1920
require 'concurrent/obligation'
2021
require 'concurrent/observable'
22+
require 'concurrent/options_parser'
2123
require 'concurrent/promise'
2224
require 'concurrent/runnable'
2325
require 'concurrent/scheduled_task'

lib/concurrent/agent.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
require 'thread'
22

3-
require 'concurrent/configuration'
43
require 'concurrent/dereferenceable'
54
require 'concurrent/observable'
5+
require 'concurrent/options_parser'
66
require 'concurrent/utilities'
77

88
module Concurrent

lib/concurrent/collection.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
require 'concurrent/collection/priority_queue'
Lines changed: 315 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,315 @@
1+
module Concurrent
2+
3+
# @!macro [new] priority_queue
4+
#
5+
# A queue collection in which the elements are sorted based on their
6+
# comparison (spaceship) operator `<=>`. Items are added to the queue
7+
# at a position relative to their priority. On removal the element
8+
# with the "highest" priority is removed. By default the sort order is
9+
# from highest to lowest, but a lowest-to-highest sort order can be
10+
# set on construction.
11+
#
12+
# The API is based on the `Queue` class from the Ruby standard library.
13+
#
14+
# The pure Ruby implementation, `MutexPriorityQueue` uses a heap algorithm
15+
# stored in an array. The algorithm is based on the work of Robert Sedgewick
16+
# and Kevin Wayne.
17+
#
18+
# The JRuby native implementation is a thin wrapper around the standard
19+
# library `java.util.PriorityQueue`.
20+
#
21+
# When running under JRuby the class `PriorityQueue` extends `JavaPriorityQueue`.
22+
# When running under all other interpreters it extends `MutexPriorityQueue`.
23+
#
24+
# @note This implementation is *not* thread safe and performs no blocking.
25+
#
26+
# @see http://en.wikipedia.org/wiki/Priority_queue
27+
# @see http://ruby-doc.org/stdlib-2.0.0/libdoc/thread/rdoc/Queue.html
28+
#
29+
# @see http://algs4.cs.princeton.edu/24pq/index.php#2.6
30+
# @see http://algs4.cs.princeton.edu/24pq/MaxPQ.java.html
31+
#
32+
# @see http://docs.oracle.com/javase/7/docs/api/java/util/PriorityQueue.html
33+
class MutexPriorityQueue
34+
35+
# Create a new priority queue with no items.
36+
#
37+
# @param [Hash] opts the options for creating the queue
38+
# @option opts [Symbol] :order (:max) dictates the order in which items are
39+
# stored: from highest to lowest when `:max` or `:high`; from lowest to
40+
# highest when `:min` or `:low`
41+
def initialize(opts = {})
42+
order = opts.fetch(:order, :max)
43+
@comparator = [:min, :low].include?(order) ? -1 : 1
44+
clear
45+
end
46+
47+
# Removes all of the elements from this priority queue.
48+
def clear
49+
@queue = [nil]
50+
@length = 0
51+
true
52+
end
53+
54+
# Deletes all items from `self` that are equal to `item`.
55+
#
56+
# @param [Object] item the item to be removed from the queue
57+
# @return [Object] true if the item is found else false
58+
def delete(item)
59+
original_length = @length
60+
k = 1
61+
while k <= @length
62+
if @queue[k] == item
63+
swap(k, @length)
64+
@length -= 1
65+
sink(k)
66+
@queue.pop
67+
else
68+
k += 1
69+
end
70+
end
71+
@length != original_length
72+
end
73+
74+
# Returns `true` if `self` contains no elements.
75+
#
76+
# @return [Boolean] true if there are no items in the queue else false
77+
def empty?
78+
size == 0
79+
end
80+
81+
# Returns `true` if the given item is present in `self` (that is, if any
82+
# element == `item`), otherwise returns false.
83+
#
84+
# @param [Object] item the item to search for
85+
#
86+
# @return [Boolean] true if the item is found else false
87+
def include?(item)
88+
@queue.include?(item)
89+
end
90+
alias_method :has_priority?, :include?
91+
92+
# The current length of the queue.
93+
#
94+
# @return [Fixnum] the number of items in the queue
95+
def length
96+
@length
97+
end
98+
alias_method :size, :length
99+
100+
# Retrieves, but does not remove, the head of this queue, or returns `nil`
101+
# if this queue is empty.
102+
#
103+
# @return [Object] the head of the queue or `nil` when empty
104+
def peek
105+
@queue[1]
106+
end
107+
108+
# Retrieves and removes the head of this queue, or returns `nil` if this
109+
# queue is empty.
110+
#
111+
# @return [Object] the head of the queue or `nil` when empty
112+
def pop
113+
max = @queue[1]
114+
swap(1, @length)
115+
@length -= 1
116+
sink(1)
117+
@queue.pop
118+
max
119+
end
120+
alias_method :deq, :pop
121+
alias_method :shift, :pop
122+
123+
# Inserts the specified element into this priority queue.
124+
#
125+
# @param [Object] item the item to insert onto the queue
126+
def push(item)
127+
@length += 1
128+
@queue << item
129+
swim(@length)
130+
true
131+
end
132+
alias_method :<<, :push
133+
alias_method :enq, :push
134+
135+
# Create a new priority queue from the given list.
136+
#
137+
# @param [Enumerable] list the list to build the queue from
138+
# @param [Hash] opts the options for creating the queue
139+
#
140+
# @return [PriorityQueue] the newly created and populated queue
141+
def self.from_list(list, opts = {})
142+
queue = new(opts)
143+
list.each{|item| queue << item }
144+
queue
145+
end
146+
147+
protected
148+
149+
# Exchange the values at the given indexes within the internal array.
150+
#
151+
# @param [Integer] x the first index to swap
152+
# @param [Integer] y the second index to swap
153+
#
154+
# @!visibility private
155+
def swap(x, y)
156+
temp = @queue[x]
157+
@queue[x] = @queue[y]
158+
@queue[y] = temp
159+
end
160+
161+
# Are the items at the given indexes ordered based on the priority
162+
# order specified at construction?
163+
#
164+
# @param [Integer] x the first index from which to retrieve a comparable value
165+
# @param [Integer] y the second index from which to retrieve a comparable value
166+
#
167+
# @return [Boolean] true if the two elements are in the correct priority order
168+
# else false
169+
#
170+
# @!visibility private
171+
def ordered?(x, y)
172+
(@queue[x] <=> @queue[y]) == @comparator
173+
end
174+
175+
# Percolate down to maintain heap invariant.
176+
#
177+
# @param [Integer] k the index at which to start the percolation
178+
#
179+
# @!visibility private
180+
def sink(k)
181+
while (j = (2 * k)) <= @length do
182+
j += 1 if j < @length && ! ordered?(j, j+1)
183+
break if ordered?(k, j)
184+
swap(k, j)
185+
k = j
186+
end
187+
end
188+
189+
# Percolate up to maintain heap invariant.
190+
#
191+
# @param [Integer] k the index at which to start the percolation
192+
#
193+
# @!visibility private
194+
def swim(k)
195+
while k > 1 && ! ordered?(k/2, k) do
196+
swap(k, k/2)
197+
k = k/2
198+
end
199+
end
200+
end
201+
202+
if RUBY_PLATFORM == 'java'
203+
204+
# @!macro priority_queue
205+
class JavaPriorityQueue
206+
207+
# Create a new priority queue with no items.
208+
#
209+
# @param [Hash] opts the options for creating the queue
210+
# @option opts [Symbol] :order (:max) dictates the order in which items are
211+
# stored: from highest to lowest when `:max` or `:high`; from lowest to
212+
# highest when `:min` or `:low`
213+
def initialize(opts = {})
214+
order = opts.fetch(:order, :max)
215+
if [:min, :low].include?(order)
216+
@queue = java.util.PriorityQueue.new(11) # 11 is the default initial capacity
217+
else
218+
@queue = java.util.PriorityQueue.new(11, java.util.Collections.reverseOrder())
219+
end
220+
end
221+
222+
# Removes all of the elements from this priority queue.
223+
def clear
224+
@queue.clear
225+
true
226+
end
227+
228+
# Deletes all items from `self` that are equal to `item`.
229+
#
230+
# @param [Object] item the item to be removed from the queue
231+
# @return [Object] true if the item is found else false
232+
def delete(item)
233+
found = false
234+
while @queue.remove(item) do
235+
found = true
236+
end
237+
found
238+
end
239+
240+
# Returns `true` if `self` contains no elements.
241+
#
242+
# @return [Boolean] true if there are no items in the queue else false
243+
def empty?
244+
@queue.size == 0
245+
end
246+
247+
# Returns `true` if the given item is present in `self` (that is, if any
248+
# element == `item`), otherwise returns false.
249+
#
250+
# @param [Object] item the item to search for
251+
#
252+
# @return [Boolean] true if the item is found else false
253+
def include?(item)
254+
@queue.contains(item)
255+
end
256+
alias_method :has_priority?, :include?
257+
258+
# The current length of the queue.
259+
#
260+
# @return [Fixnum] the number of items in the queue
261+
def length
262+
@queue.size
263+
end
264+
alias_method :size, :length
265+
266+
# Retrieves, but does not remove, the head of this queue, or returns `nil`
267+
# if this queue is empty.
268+
#
269+
# @return [Object] the head of the queue or `nil` when empty
270+
def peek
271+
@queue.peek
272+
end
273+
274+
# Retrieves and removes the head of this queue, or returns `nil` if this
275+
# queue is empty.
276+
#
277+
# @return [Object] the head of the queue or `nil` when empty
278+
def pop
279+
@queue.poll
280+
end
281+
alias_method :deq, :pop
282+
alias_method :shift, :pop
283+
284+
# Inserts the specified element into this priority queue.
285+
#
286+
# @param [Object] item the item to insert onto the queue
287+
def push(item)
288+
@queue.add(item)
289+
end
290+
alias_method :<<, :push
291+
alias_method :enq, :push
292+
293+
# Create a new priority queue from the given list.
294+
#
295+
# @param [Enumerable] list the list to build the queue from
296+
# @param [Hash] opts the options for creating the queue
297+
#
298+
# @return [PriorityQueue] the newly created and populated queue
299+
def self.from_list(list, opts = {})
300+
queue = new(opts)
301+
list.each{|item| queue << item }
302+
queue
303+
end
304+
end
305+
306+
# @!macro priority_queue
307+
class PriorityQueue < JavaPriorityQueue
308+
end
309+
else
310+
311+
# @!macro priority_queue
312+
class PriorityQueue < MutexPriorityQueue
313+
end
314+
end
315+
end

lib/concurrent/configuration.rb

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def global_operation_pool
6666
#
6767
# @see Concurrent::timer
6868
def global_timer_pool
69-
@global_timer_pool ||= Concurrent::CachedThreadPool.new
69+
@global_timer_pool ||= Concurrent::TimerSet.new
7070
end
7171

7272
# Global thread pool optimized for short *tasks*.
@@ -108,28 +108,6 @@ def global_operation_pool=(executor)
108108
end
109109
end
110110

111-
# A mixin module for parsing options hashes related to gem-level configuration.
112-
module OptionsParser
113-
114-
# Get the requested `Executor` based on the values set in the options hash.
115-
#
116-
# @param [Hash] opts the options defining the requested executor
117-
# @option opts [Executor] :executor (`nil`) when set use the given `Executor` instance
118-
# @option opts [Boolean] :operation (`false`) when true use the global operation pool
119-
# @option opts [Boolean] :task (`true`) when true use the global task pool
120-
#
121-
# @return [Executor] the requested thread pool (default: global task pool)
122-
def get_executor_from(opts = {})
123-
if opts[:executor]
124-
opts[:executor]
125-
elsif opts[:operation] == true || opts[:task] == false
126-
Concurrent.configuration.global_operation_pool
127-
else
128-
Concurrent.configuration.global_task_pool
129-
end
130-
end
131-
end
132-
133111
private
134112

135113
# Attempt to properly shutdown the given executor using the `shutdown` or

lib/concurrent/executor.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@
55
require 'concurrent/executor/safe_task_executor'
66
require 'concurrent/executor/single_thread_executor'
77
require 'concurrent/executor/thread_pool_executor'
8+
require 'concurrent/executor/timer_set'

0 commit comments

Comments
 (0)