Skip to content

Commit b02d117

Browse files
committed
Allow to submit atomically more than 1 task to SerializedExecution
1 parent f1471e7 commit b02d117

File tree

1 file changed

+26
-10
lines changed

1 file changed

+26
-10
lines changed

lib/concurrent/executor/serialized_execution.rb

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,23 +33,39 @@ def initialize
3333
#
3434
# @raise [ArgumentError] if no task is given
3535
def post(executor, *args, &task)
36-
return nil if task.nil?
36+
posts [[executor, args, task]]
37+
true
38+
end
39+
40+
# As {#post} but allows to submit multiple tasks at once, it's guaranteed that they will not
41+
# be interleaved by other tasks.
42+
#
43+
# @param [Array<Array(Executor, Array<Object>, Proc)>] posts array of triplets where
44+
# first is a {Executor}, second is array of args for task, third is a task (Proc)
45+
def posts(posts)
46+
# if can_overflow?
47+
# raise ArgumentError, 'SerializedExecution does not support thread-pools which can overflow'
48+
# end
49+
50+
return nil if posts.empty?
3751

38-
job = Job.new executor, args, task
52+
jobs = posts.map { |executor, args, task| Job.new executor, args, task }
3953

4054
begin
4155
@mutex.lock
42-
post = if @being_executed
43-
@stash << job
44-
false
45-
else
46-
@being_executed = true
47-
end
56+
job_to_post = if @being_executed
57+
@stash.push(*jobs)
58+
nil
59+
else
60+
@being_executed = true
61+
@stash.push(*jobs[1..-1])
62+
jobs.first
63+
end
4864
ensure
4965
@mutex.unlock
5066
end
5167

52-
call_job job if post
68+
call_job job_to_post if job_to_post
5369
true
5470
end
5571

@@ -98,7 +114,7 @@ class SerializedExecutionDelegator < SimpleDelegator
98114
include SerialExecutor
99115

100116
def initialize(executor)
101-
@executor = executor
117+
@executor = executor
102118
@serializer = SerializedExecution.new
103119
super(executor)
104120
end

0 commit comments

Comments
 (0)