Skip to content

Commit 24ce228

Browse files
committed
Use safe synchronization in SerializedExecution
1 parent b02d117 commit 24ce228

File tree

1 file changed

+16
-19
lines changed

1 file changed

+16
-19
lines changed

lib/concurrent/executor/serialized_execution.rb

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
require 'delegate'
22
require 'concurrent/executor/executor'
33
require 'concurrent/logging'
4+
require 'concurrent/atomic/synchronization'
45

56
module Concurrent
67

78
# Ensures passed jobs in a serialized order never running at the same time.
89
class SerializedExecution
910
include Logging
11+
include Synchronization
1012

1113
Job = Struct.new(:executor, :args, :block) do
1214
def call
@@ -15,9 +17,10 @@ def call
1517
end
1618

1719
def initialize
18-
@being_executed = false
19-
@stash = []
20-
@mutex = Mutex.new
20+
synchronize do
21+
@being_executed = false
22+
@stash = []
23+
end
2124
end
2225

2326
# Submit a task to the executor for asynchronous processing.
@@ -51,18 +54,15 @@ def posts(posts)
5154

5255
jobs = posts.map { |executor, args, task| Job.new executor, args, task }
5356

54-
begin
55-
@mutex.lock
56-
job_to_post = if @being_executed
57-
@stash.push(*jobs)
58-
nil
59-
else
60-
@being_executed = true
61-
@stash.push(*jobs[1..-1])
62-
jobs.first
63-
end
64-
ensure
65-
@mutex.unlock
57+
job_to_post = synchronize do
58+
if @being_executed
59+
@stash.push(*jobs)
60+
nil
61+
else
62+
@being_executed = true
63+
@stash.push(*jobs[1..-1])
64+
jobs.first
65+
end
6666
end
6767

6868
call_job job_to_post if job_to_post
@@ -94,11 +94,8 @@ def call_job(job)
9494
def work(job)
9595
job.call
9696
ensure
97-
begin
98-
@mutex.lock
97+
synchronize do
9998
job = @stash.shift || (@being_executed = false)
100-
ensure
101-
@mutex.unlock
10299
end
103100

104101
call_job job if job

0 commit comments

Comments
 (0)