Skip to content

Commit f84d0ab

Browse files
committed
Rename OneByOne to SerializedExecution add basic rejection handling
1 parent 430a805 commit f84d0ab

File tree

6 files changed

+41
-28
lines changed

6 files changed

+41
-28
lines changed

lib/concurrent/actress.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
require 'concurrent/configuration'
2-
require 'concurrent/executor/one_by_one'
2+
require 'concurrent/executor/serialized_execution'
33
require 'concurrent/ivar'
44
require 'concurrent/logging'
55

lib/concurrent/actress/core.rb

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@ class Core
2323
# can be used to hook actor instance to any logging system
2424
# @param [Proc] block for class instantiation
2525
def initialize(opts = {}, &block)
26-
@mailbox = Array.new
27-
@one_by_one = OneByOne.new
26+
@mailbox = Array.new
27+
@serialized_execution = SerializedExecution.new
2828
# noinspection RubyArgCount
29-
@terminated = Event.new
30-
@executor = Type! opts.fetch(:executor, Concurrent.configuration.global_task_pool), Executor
31-
@children = Set.new
32-
@reference = Reference.new self
33-
@name = (Type! opts.fetch(:name), String, Symbol).to_s
29+
@terminated = Event.new
30+
@executor = Type! opts.fetch(:executor, Concurrent.configuration.global_task_pool), Executor
31+
@children = Set.new
32+
@reference = Reference.new self
33+
@name = (Type! opts.fetch(:name), String, Symbol).to_s
3434

3535
parent = opts[:parent]
3636
@parent_core = (Type! parent, Reference, NilClass) && parent.send(:core)
@@ -180,7 +180,7 @@ def receive_envelope
180180
# Schedules blocks to be executed on executor sequentially,
181181
# sets Actress.current
182182
def schedule_execution
183-
@one_by_one.post(@executor) do
183+
@serialized_execution.post(@executor) do
184184
begin
185185
Thread.current[:__current_actress__] = reference
186186
yield

lib/concurrent/agent.rb

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,14 @@ class Agent
6262
# @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and
6363
# returning the value returned from the proc
6464
def initialize(initial, opts = {})
65-
@value = initial
66-
@rescuers = []
67-
@validator = Proc.new { |result| true }
68-
@timeout = opts.fetch(:timeout, TIMEOUT).freeze
69-
self.observers = CopyOnWriteObserverSet.new
70-
@one_by_one = OneByOne.new
71-
@task_executor = OptionsParser.get_task_executor_from(opts)
72-
@operation_executor = OptionsParser.get_operation_executor_from(opts)
65+
@value = initial
66+
@rescuers = []
67+
@validator = Proc.new { |result| true }
68+
@timeout = opts.fetch(:timeout, TIMEOUT).freeze
69+
self.observers = CopyOnWriteObserverSet.new
70+
@serialized_execution = SerializedExecution.new
71+
@task_executor = OptionsParser.get_task_executor_from(opts)
72+
@operation_executor = OptionsParser.get_operation_executor_from(opts)
7373
init_mutex
7474
set_deref_options(opts)
7575
end
@@ -180,7 +180,7 @@ def await(timeout = nil)
180180

181181
def post_on(executor, &block)
182182
return nil if block.nil?
183-
@one_by_one.post(executor) { work(&block) }
183+
@serialized_execution.post(executor) { work(&block) }
184184
true
185185
end
186186

lib/concurrent/executor/one_by_one.rb renamed to lib/concurrent/executor/serialized_execution.rb

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
require 'concurrent/logging'
2+
13
module Concurrent
24

3-
# Ensures that jobs are passed to the given executors one by one,
4-
# never running at the same time.
5-
class OneByOne
5+
# Ensures passed jobs in a serialized order never running at the same time.
6+
class SerializedExecution
7+
include Logging
68

79
Job = Struct.new(:executor, :args, :block) do
810
def call
@@ -30,10 +32,6 @@ def initialize
3032
# @raise [ArgumentError] if no task is given
3133
def post(executor, *args, &task)
3234
return nil if task.nil?
33-
# FIXME Agent#send-off will blow up here
34-
# if executor.can_overflow?
35-
# raise ArgumentError, 'OneByOne cannot be used in conjunction with executor which may overflow'
36-
# end
3735

3836
job = Job.new executor, args, task
3937

@@ -56,7 +54,22 @@ def post(executor, *args, &task)
5654
private
5755

5856
def call_job(job)
59-
job.executor.post { work(job) }
57+
did_it_run = begin
58+
job.executor.post { work(job) }
59+
true
60+
rescue RejectedExecutionError => ex
61+
false
62+
end
63+
64+
# TODO not the best idea to run it myself
65+
unless did_it_run
66+
begin
67+
work job
68+
rescue => ex
69+
# let it fail
70+
log DEBUG, ex
71+
end
72+
end
6073
end
6174

6275
# ensures next job is executed if any is stashed

lib/concurrent/executors.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@
66
require 'concurrent/executor/single_thread_executor'
77
require 'concurrent/executor/thread_pool_executor'
88
require 'concurrent/executor/timer_set'
9-
require 'concurrent/executor/one_by_one'
9+
require 'concurrent/executor/serialized_execution'

spec/concurrent/agent_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ def trigger_observable(observable)
164164
subject.post { nil }
165165
sleep(0.1)
166166
subject.
167-
instance_variable_get(:@one_by_one).
167+
instance_variable_get(:@serialized_execution).
168168
instance_variable_get(:@stash).
169169
size.should eq 2
170170
end

0 commit comments

Comments
 (0)