Skip to content

Commit 799cfe6

Browse files
committed
Merge remote-tracking branch 'upstream/master' into actress
* upstream/master: (13 commits) Temporarily adding Rbx to Travis allowed failure list due to problem with the Travis service. ... Conflicts: lib/concurrent/atomic/thread_local_var.rb
2 parents 345bd4b + bc8dc63 commit 799cfe6

19 files changed

+114
-140
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ rdoc/*
2020
*.REMOTE.*
2121
git_pull.txt
2222
coverage
23+
critic
2324
.DS_Store
2425
TAGS
2526
tmtags

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ branches:
1414
- master
1515
matrix:
1616
allow_failures:
17+
- rvm: rbx-2
1718
- rvm: ruby-head
1819
- rvm: jruby-head
1920
- rvm: 1.9.3

Gemfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ end
1616

1717
group :documentation do
1818
gem 'countloc', '~> 0.4.0', :platforms => :mri, :require => false
19+
gem 'rubycritic', '~> 1.0.2', :platforms => :mri, require: false
1920
gem 'yard', '~> 0.8.7.4', :require => false
2021
gem 'inch', '~> 0.4.6', :platforms => :mri, :require => false
2122
gem 'redcarpet', '~> 3.1.2', platforms: :mri # understands github markdown

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ _NOTE: There is an old gem from 2007 called "concurrent" that does not appear to
5454
## Features & Documentation
5555

5656
Please see the [Concurrent Ruby Wiki](https://github.com/ruby-concurrency/concurrent-ruby/wiki)
57-
or the [API documentation](http://ruby-concurrency.github.io/concurrent-ruby/frames.html))
57+
or the [API documentation](http://ruby-concurrency.github.io/concurrent-ruby/frames.html)
5858
for more information or join our [mailing list](http://groups.google.com/group/concurrent-ruby).
5959

6060
There are many concurrency abstractions in this library. These abstractions can be broadly categorized
@@ -146,6 +146,7 @@ task.value #=> 25.96
146146
* [Chip Miller](https://github.com/chip-miller)
147147
* [Giuseppe Capizzi](https://github.com/gcapizzi)
148148
* [Jamie Hodge](https://github.com/jamiehodge)
149+
* [Justin Lambert](https://github.com/mastfish)
149150
* [Larry Lv](https://github.com/larrylv)
150151
* [Maxim Chechel](https://github.com/maximchick)
151152
* [Ravil Bayramgalin](https://github.com/brainopia)

lib/concurrent/agent.rb

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,13 @@ class Agent
3838
include Concurrent::Observable
3939
include Logging
4040

41-
# The default timeout value (in seconds); used when no timeout option
42-
# is given at initialization
43-
TIMEOUT = 5
44-
4541
attr_reader :timeout, :task_executor, :operation_executor
4642

4743
# Initialize a new Agent with the given initial value and provided options.
4844
#
4945
# @param [Object] initial the initial value
5046
# @param [Hash] opts the options used to define the behavior at update and deref
5147
#
52-
# @option opts [Fixnum] :timeout (TIMEOUT) maximum number of seconds before an update is cancelled
53-
#
5448
# @option opts [Boolean] :operation (false) when `true` will execute the future on the global
5549
# operation pool (for long-running operations), when `false` will execute the future on the
5650
# global task pool (for short-running tasks)
@@ -65,7 +59,6 @@ def initialize(initial, opts = {})
6559
@value = initial
6660
@rescuers = []
6761
@validator = Proc.new { |result| true }
68-
@timeout = opts.fetch(:timeout, TIMEOUT).freeze
6962
self.observers = CopyOnWriteObserverSet.new
7063
@serialized_execution = SerializedExecution.new
7164
@task_executor = OptionsParser.get_task_executor_from(opts)
@@ -145,12 +138,19 @@ def post(&block)
145138
# Update the current value with the result of the given block operation,
146139
# block can do blocking calls
147140
#
141+
# @param [Fixnum, nil] timeout maximum number of seconds before an update is cancelled
142+
#
148143
# @yield the operation to be performed with the current value in order to calculate
149144
# the new value
150145
# @yieldparam [Object] value the current value
151146
# @yieldreturn [Object] the new value
152147
# @return [true, nil] nil when no block is given
153-
def post_off(&block)
148+
def post_off(timeout = nil, &block)
149+
block = if timeout
150+
lambda { |value| Concurrent::timeout(timeout) { block.call(value) } }
151+
else
152+
block
153+
end
154154
post_on(@operation_executor, &block)
155155
end
156156

@@ -203,10 +203,8 @@ def work(&handler) # :nodoc:
203203
validator, value = mutex.synchronize { [@validator, @value] }
204204

205205
begin
206-
result, valid = Concurrent::timeout(@timeout) do
207-
result = handler.call(value)
208-
[result, validator.call(result)]
209-
end
206+
result = handler.call(value)
207+
valid = validator.call(result)
210208
rescue Exception => ex
211209
exception = ex
212210
end

lib/concurrent/atomic/thread_local_var.rb

Lines changed: 6 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,19 @@
1-
require 'concurrent/atomic/atomic_fixnum'
1+
require 'concurrent/atomic'
22

33
module Concurrent
44

5-
module ThreadLocalSymbolAllocator
6-
7-
COUNTER = Concurrent::AtomicFixnum.new
8-
9-
protected
10-
11-
def allocate_symbol
12-
# Warning: this symbol may never be deallocated
13-
@symbol = :"thread_local_symbol_#{COUNTER.increment}"
14-
end
15-
16-
end
17-
18-
module ThreadLocalOldStorage
19-
20-
include ThreadLocalSymbolAllocator
21-
22-
protected
23-
24-
def allocate_storage
25-
allocate_symbol
26-
end
27-
28-
def get
29-
Thread.current[@symbol]
30-
end
31-
32-
def set(value)
33-
Thread.current[@symbol] = value
34-
end
35-
36-
end
37-
38-
module ThreadLocalNewStorage
39-
40-
include ThreadLocalSymbolAllocator
41-
42-
protected
5+
module ThreadLocalRubyStorage
436

447
def allocate_storage
45-
allocate_symbol
8+
@storage = Atomic.new Hash.new
469
end
4710

4811
def get
49-
Thread.current.thread_variable_get(@symbol)
12+
@storage.get[Thread.current]
5013
end
5114

5215
def set(value)
53-
Thread.current.thread_variable_set(@symbol, value)
16+
@storage.update { |s| s.merge Thread.current => value }
5417
end
5518

5619
end
@@ -111,10 +74,8 @@ def value=(value)
11174
class ThreadLocalVar < AbstractThreadLocalVar
11275
if RUBY_PLATFORM == 'java'
11376
include ThreadLocalJavaStorage
114-
elsif Thread.current.respond_to?(:thread_variable_set)
115-
include ThreadLocalNewStorage
11677
else
117-
include ThreadLocalOldStorage
78+
include ThreadLocalRubyStorage
11879
end
11980
end
12081

lib/concurrent/dataflow.rb

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,34 @@ def update(time, value, reason)
6161
# @raise [ArgumentError] if no block is given
6262
# @raise [ArgumentError] if any of the inputs are not `IVar`s
6363
def dataflow(*inputs, &block)
64-
dataflow_with(Concurrent.configuration.global_task_pool, *inputs, &block)
64+
dataflow_with(Concurrent.configuration.global_operation_pool, *inputs, &block)
6565
end
6666
module_function :dataflow
6767

6868
def dataflow_with(executor, *inputs, &block)
69+
call_dataflow(:value, executor, *inputs, &block)
70+
end
71+
module_function :dataflow_with
72+
73+
def dataflow!(*inputs, &block)
74+
dataflow_with!(Concurrent.configuration.global_task_pool, *inputs, &block)
75+
end
76+
module_function :dataflow!
77+
78+
def dataflow_with!(executor, *inputs, &block)
79+
call_dataflow(:value!, executor, *inputs, &block)
80+
end
81+
module_function :dataflow_with!
82+
83+
private
84+
85+
def call_dataflow(method, executor, *inputs, &block)
6986
raise ArgumentError.new('an executor must be provided') if executor.nil?
7087
raise ArgumentError.new('no block given') unless block_given?
7188
raise ArgumentError.new('not all dependencies are IVars') unless inputs.all? { |input| input.is_a? IVar }
7289

7390
result = Future.new(executor: executor) do
74-
values = inputs.map { |input| input.value }
91+
values = inputs.map { |input| input.send(method) }
7592
block.call(*values)
7693
end
7794

@@ -87,5 +104,5 @@ def dataflow_with(executor, *inputs, &block)
87104

88105
result
89106
end
90-
module_function :dataflow_with
107+
module_function :call_dataflow
91108
end

lib/concurrent/executor/timer_set.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class TimerSet
2323
# this executor rather than the global thread pool (overrides :operation)
2424
def initialize(opts = {})
2525
@queue = PriorityQueue.new(order: :min)
26-
@task_executor = OptionsParser::get_executor_from(opts)
26+
@task_executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_task_pool
2727
@timer_executor = SingleThreadExecutor.new
2828
@condition = Condition.new
2929
init_executor

lib/concurrent/future.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def initialize(opts = {}, &block)
6262
super(IVar::NO_VALUE, opts)
6363
@state = :unscheduled
6464
@task = block
65-
@executor = OptionsParser::get_executor_from(opts)
65+
@executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_operation_pool
6666
end
6767

6868
# Execute an `:unscheduled` `Future`. Immediately sets the state to `:pending` and

lib/concurrent/options_parser.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,16 @@ module OptionsParser
1010
# @option opts [Boolean] :operation (`false`) when true use the global operation pool
1111
# @option opts [Boolean] :task (`true`) when true use the global task pool
1212
#
13-
# @return [Executor] the requested thread pool (default: global task pool)
13+
# @return [Executor, nil] the requested thread pool, or nil when no option specified
1414
def get_executor_from(opts = {})
1515
if opts[:executor]
1616
opts[:executor]
1717
elsif opts[:operation] == true || opts[:task] == false
1818
Concurrent.configuration.global_operation_pool
19-
else
19+
elsif opts[:operation] == false || opts[:task] == true
2020
Concurrent.configuration.global_task_pool
21+
else
22+
nil
2123
end
2224
end
2325

0 commit comments

Comments
 (0)