Skip to content

Commit 1a0cf49

Browse files
authored
Merge pull request #832 from sambostock/semaphore-blocks
Allow semaphore permit acquisition with blocks
2 parents 4d5226d + 6511ed4 commit 1a0cf49

File tree

4 files changed

+310
-73
lines changed

4 files changed

+310
-73
lines changed

ext/concurrent-ruby/com/concurrent_ruby/ext/JavaSemaphoreLibrary.java

Lines changed: 52 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.jruby.RubyObject;
1111
import org.jruby.anno.JRubyClass;
1212
import org.jruby.anno.JRubyMethod;
13+
import org.jruby.runtime.Block;
1314
import org.jruby.runtime.ObjectAllocator;
1415
import org.jruby.runtime.ThreadContext;
1516
import org.jruby.runtime.builtin.IRubyObject;
@@ -45,9 +46,13 @@ public IRubyObject initialize(ThreadContext context, IRubyObject value) {
4546
}
4647

4748
@JRubyMethod
48-
public IRubyObject acquire(ThreadContext context, IRubyObject value) throws InterruptedException {
49-
this.semaphore.acquire(rubyFixnumToPositiveInt(value, "permits"));
50-
return context.nil;
49+
public IRubyObject acquire(ThreadContext context, final Block block) throws InterruptedException {
50+
return this.acquire(context, 1, block);
51+
}
52+
53+
@JRubyMethod
54+
public IRubyObject acquire(ThreadContext context, IRubyObject permits, final Block block) throws InterruptedException {
55+
return this.acquire(context, rubyFixnumToPositiveInt(permits, "permits"), block);
5156
}
5257

5358
@JRubyMethod(name = "available_permits")
@@ -60,30 +65,32 @@ public IRubyObject drainPermits(ThreadContext context) {
6065
return getRuntime().newFixnum(this.semaphore.drainPermits());
6166
}
6267

63-
@JRubyMethod
64-
public IRubyObject acquire(ThreadContext context) throws InterruptedException {
65-
this.semaphore.acquire(1);
66-
return context.nil;
67-
}
68-
6968
@JRubyMethod(name = "try_acquire")
70-
public IRubyObject tryAcquire(ThreadContext context) throws InterruptedException {
71-
return getRuntime().newBoolean(semaphore.tryAcquire(1));
69+
public IRubyObject tryAcquire(ThreadContext context, final Block block) throws InterruptedException {
70+
int permitsInt = 1;
71+
boolean acquired = semaphore.tryAcquire(permitsInt);
72+
73+
return triedAcquire(context, permitsInt, acquired, block);
7274
}
7375

7476
@JRubyMethod(name = "try_acquire")
75-
public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits) throws InterruptedException {
76-
return getRuntime().newBoolean(semaphore.tryAcquire(rubyFixnumToPositiveInt(permits, "permits")));
77+
public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits, final Block block) throws InterruptedException {
78+
int permitsInt = rubyFixnumToPositiveInt(permits, "permits");
79+
boolean acquired = semaphore.tryAcquire(permitsInt);
80+
81+
return triedAcquire(context, permitsInt, acquired, block);
7782
}
7883

7984
@JRubyMethod(name = "try_acquire")
80-
public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits, IRubyObject timeout) throws InterruptedException {
81-
return getRuntime().newBoolean(
82-
semaphore.tryAcquire(
83-
rubyFixnumToPositiveInt(permits, "permits"),
84-
rubyNumericToLong(timeout, "timeout"),
85-
java.util.concurrent.TimeUnit.SECONDS)
86-
);
85+
public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits, IRubyObject timeout, final Block block) throws InterruptedException {
86+
int permitsInt = rubyFixnumToPositiveInt(permits, "permits");
87+
boolean acquired = semaphore.tryAcquire(
88+
permitsInt,
89+
rubyNumericToLong(timeout, "timeout"),
90+
java.util.concurrent.TimeUnit.SECONDS
91+
);
92+
93+
return triedAcquire(context, permitsInt, acquired, block);
8794
}
8895

8996
@JRubyMethod
@@ -93,8 +100,8 @@ public IRubyObject release(ThreadContext context) {
93100
}
94101

95102
@JRubyMethod
96-
public IRubyObject release(ThreadContext context, IRubyObject value) {
97-
this.semaphore.release(rubyFixnumToPositiveInt(value, "permits"));
103+
public IRubyObject release(ThreadContext context, IRubyObject permits) {
104+
this.semaphore.release(rubyFixnumToPositiveInt(permits, "permits"));
98105
return getRuntime().newBoolean(true);
99106
}
100107

@@ -104,6 +111,29 @@ public IRubyObject reducePermits(ThreadContext context, IRubyObject reduction) t
104111
return context.nil;
105112
}
106113

114+
private IRubyObject acquire(ThreadContext context, int permits, final Block block) throws InterruptedException {
115+
this.semaphore.acquire(permits);
116+
117+
if (!block.isGiven()) return context.nil;
118+
119+
try {
120+
return block.yieldSpecific(context);
121+
} finally {
122+
this.semaphore.release(permits);
123+
}
124+
}
125+
126+
private IRubyObject triedAcquire(ThreadContext context, int permits, boolean acquired, final Block block) {
127+
if (!block.isGiven()) return getRuntime().newBoolean(acquired);
128+
if (!acquired) return context.nil;
129+
130+
try {
131+
return block.yieldSpecific(context);
132+
} finally {
133+
this.semaphore.release(permits);
134+
}
135+
}
136+
107137
private int rubyFixnumInt(IRubyObject value, String paramName) {
108138
if (value instanceof RubyFixnum) {
109139
RubyFixnum fixNum = (RubyFixnum) value;

lib/concurrent-ruby/concurrent/atomic/mutex_semaphore.rb

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,14 @@ def acquire(permits = 1)
2323

2424
synchronize do
2525
try_acquire_timed(permits, nil)
26-
nil
26+
end
27+
28+
return unless block_given?
29+
30+
begin
31+
yield
32+
ensure
33+
release(permits)
2734
end
2835
end
2936

@@ -48,13 +55,22 @@ def try_acquire(permits = 1, timeout = nil)
4855
Utility::NativeInteger.ensure_integer_and_bounds permits
4956
Utility::NativeInteger.ensure_positive permits
5057

51-
synchronize do
58+
acquired = synchronize do
5259
if timeout.nil?
5360
try_acquire_now(permits)
5461
else
5562
try_acquire_timed(permits, timeout)
5663
end
5764
end
65+
66+
return acquired unless block_given?
67+
return unless acquired
68+
69+
begin
70+
yield
71+
ensure
72+
release(permits)
73+
end
5874
end
5975

6076
# @!macro semaphore_method_release

lib/concurrent-ruby/concurrent/atomic/semaphore.rb

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@ module Concurrent
1616
# @!macro semaphore_method_acquire
1717
#
1818
# Acquires the given number of permits from this semaphore,
19-
# blocking until all are available.
19+
# blocking until all are available. If a block is given,
20+
# yields to it and releases the permits afterwards.
2021
#
2122
# @param [Fixnum] permits Number of permits to acquire
2223
#
2324
# @raise [ArgumentError] if `permits` is not an integer or is less than
2425
# one
2526
#
26-
# @return [nil]
27+
# @return [nil, BasicObject] Without a block, `nil` is returned. If a block
28+
# is given, its return value is returned.
2729

2830
# @!macro semaphore_method_available_permits
2931
#
@@ -41,7 +43,9 @@ module Concurrent
4143
#
4244
# Acquires the given number of permits from this semaphore,
4345
# only if all are available at the time of invocation or within
44-
# `timeout` interval
46+
# `timeout` interval. If a block is given, yields to it if the permits
47+
# were successfully acquired, and releases them afterward, returning the
48+
# block's return value.
4549
#
4650
# @param [Fixnum] permits the number of permits to acquire
4751
#
@@ -51,8 +55,10 @@ module Concurrent
5155
# @raise [ArgumentError] if `permits` is not an integer or is less than
5256
# one
5357
#
54-
# @return [Boolean] `false` if no permits are available, `true` when
55-
# acquired a permit
58+
# @return [true, false, nil, BasicObject] `false` if no permits are
59+
# available, `true` when acquired a permit. If a block is given, the
60+
# block's return value is returned if the permits were acquired; if not,
61+
# `nil` is returned.
5662

5763
# @!macro semaphore_method_release
5864
#
@@ -106,6 +112,8 @@ module Concurrent
106112
# releasing a blocking acquirer.
107113
# However, no actual permit objects are used; the Semaphore just keeps a
108114
# count of the number available and acts accordingly.
115+
# Alternatively, permits may be acquired within a block, and automatically
116+
# released after the block finishes executing.
109117
#
110118
# @!macro semaphore_public_api
111119
# @example
@@ -140,6 +148,19 @@ module Concurrent
140148
# # Thread 4 releasing semaphore
141149
# # Thread 1 acquired semaphore
142150
#
151+
# @example
152+
# semaphore = Concurrent::Semaphore.new(1)
153+
#
154+
# puts semaphore.available_permits
155+
# semaphore.acquire do
156+
# puts semaphore.available_permits
157+
# end
158+
# puts semaphore.available_permits
159+
#
160+
# # prints:
161+
# # 1
162+
# # 0
163+
# # 1
143164
class Semaphore < SemaphoreImplementation
144165
end
145166
end

0 commit comments

Comments
 (0)