1
- require 'concurrent/atomic/condition '
1
+ require 'concurrent/synchronization '
2
2
3
3
module Concurrent
4
- class MutexSemaphore
4
+ class MutexSemaphore < Synchronization :: Object
5
5
# @!macro [attach] semaphore_method_initialize
6
6
#
7
7
# Create a new `Semaphore` with the initial `count`.
@@ -13,9 +13,7 @@ def initialize(count)
13
13
unless count . is_a? ( Fixnum ) && count >= 0
14
14
fail ArgumentError , 'count must be an non-negative integer'
15
15
end
16
- @mutex = Mutex . new
17
- @condition = Condition . new
18
- @free = count
16
+ super ( count )
19
17
end
20
18
21
19
# @!macro [attach] semaphore_method_acquire
@@ -33,7 +31,7 @@ def acquire(permits = 1)
33
31
unless permits . is_a? ( Fixnum ) && permits > 0
34
32
fail ArgumentError , 'permits must be an integer greater than zero'
35
33
end
36
- @mutex . synchronize do
34
+ synchronize do
37
35
try_acquire_timed ( permits , nil )
38
36
nil
39
37
end
@@ -45,7 +43,7 @@ def acquire(permits = 1)
45
43
#
46
44
# @return [Integer]
47
45
def available_permits
48
- @mutex . synchronize { @free }
46
+ synchronize { @free }
49
47
end
50
48
51
49
# @!macro [attach] semaphore_method_drain_permits
@@ -54,7 +52,7 @@ def available_permits
54
52
#
55
53
# @return [Integer]
56
54
def drain_permits
57
- @mutex . synchronize do
55
+ synchronize do
58
56
@free . tap { |_ | @free = 0 }
59
57
end
60
58
end
@@ -79,7 +77,7 @@ def try_acquire(permits = 1, timeout = nil)
79
77
unless permits . is_a? ( Fixnum ) && permits > 0
80
78
fail ArgumentError , 'permits must be an integer greater than zero'
81
79
end
82
- @mutex . synchronize do
80
+ synchronize do
83
81
if timeout . nil?
84
82
try_acquire_now ( permits )
85
83
else
@@ -101,9 +99,9 @@ def release(permits = 1)
101
99
unless permits . is_a? ( Fixnum ) && permits > 0
102
100
fail ArgumentError , 'permits must be an integer greater than zero'
103
101
end
104
- @mutex . synchronize do
102
+ synchronize do
105
103
@free += permits
106
- permits . times { @condition . signal }
104
+ permits . times { ns_signal }
107
105
end
108
106
nil
109
107
end
@@ -125,10 +123,16 @@ def reduce_permits(reduction)
125
123
unless reduction . is_a? ( Fixnum ) && reduction >= 0
126
124
fail ArgumentError , 'reduction must be an non-negative integer'
127
125
end
128
- @mutex . synchronize { @free -= reduction }
126
+ synchronize { @free -= reduction }
129
127
nil
130
128
end
131
129
130
+ protected
131
+
132
+ def ns_initialize ( count )
133
+ @free = count
134
+ end
135
+
132
136
private
133
137
134
138
def try_acquire_now ( permits )
@@ -141,12 +145,13 @@ def try_acquire_now(permits)
141
145
end
142
146
143
147
def try_acquire_timed ( permits , timeout )
144
- remaining = Condition ::Result . new ( timeout )
145
- while !try_acquire_now ( permits ) && remaining . can_wait?
146
- @condition . signal
147
- remaining = @condition . wait ( @mutex , remaining . remaining_time )
148
- end
149
- remaining . can_wait? ? true : false
148
+ ns_wait_until ( timeout ) { try_acquire_now ( permits ) }
149
+ #remaining = Condition::Result.new(timeout)
150
+ #while !try_acquire_now(permits) && remaining.can_wait?
151
+ #@condition.signal
152
+ #remaining = @condition.wait(@mutex, remaining.remaining_time)
153
+ #end
154
+ #remaining.can_wait? ? true : false
150
155
end
151
156
end
152
157
0 commit comments