Skip to content

Commit 6511ed4

Browse files
committed
Allow semaphore acquisition with blocks
Ruby often idomaticaly uses blocks to open a resource and close it again automatically. For example: file = File.open('file', 'r') # do work with file file.close # vs File.open('file', 'r') do |file| # do work with file end # file closed automatically This teaches semaphores a similar technique: semaphore = Concurrent::Semaphore.new(1) semaphore.acquire # do work semaphore.release # vs semaphore.acquire do # do work end # permit automatically released
1 parent 9608846 commit 6511ed4

File tree

4 files changed

+286
-66
lines changed

4 files changed

+286
-66
lines changed

ext/concurrent-ruby/com/concurrent_ruby/ext/JavaSemaphoreLibrary.java

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.jruby.RubyObject;
1111
import org.jruby.anno.JRubyClass;
1212
import org.jruby.anno.JRubyMethod;
13+
import org.jruby.runtime.Block;
1314
import org.jruby.runtime.ObjectAllocator;
1415
import org.jruby.runtime.ThreadContext;
1516
import org.jruby.runtime.builtin.IRubyObject;
@@ -45,13 +46,13 @@ public IRubyObject initialize(ThreadContext context, IRubyObject value) {
4546
}
4647

4748
@JRubyMethod
48-
public IRubyObject acquire(ThreadContext context) throws InterruptedException {
49-
return this.acquire(context, 1);
49+
public IRubyObject acquire(ThreadContext context, final Block block) throws InterruptedException {
50+
return this.acquire(context, 1, block);
5051
}
5152

5253
@JRubyMethod
53-
public IRubyObject acquire(ThreadContext context, IRubyObject permits) throws InterruptedException {
54-
return this.acquire(context, rubyFixnumToPositiveInt(permits, "permits"));
54+
public IRubyObject acquire(ThreadContext context, IRubyObject permits, final Block block) throws InterruptedException {
55+
return this.acquire(context, rubyFixnumToPositiveInt(permits, "permits"), block);
5556
}
5657

5758
@JRubyMethod(name = "available_permits")
@@ -65,31 +66,31 @@ public IRubyObject drainPermits(ThreadContext context) {
6566
}
6667

6768
@JRubyMethod(name = "try_acquire")
68-
public IRubyObject tryAcquire(ThreadContext context) throws InterruptedException {
69+
public IRubyObject tryAcquire(ThreadContext context, final Block block) throws InterruptedException {
6970
int permitsInt = 1;
7071
boolean acquired = semaphore.tryAcquire(permitsInt);
7172

72-
return triedAcquire(context, acquired);
73+
return triedAcquire(context, permitsInt, acquired, block);
7374
}
7475

7576
@JRubyMethod(name = "try_acquire")
76-
public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits) throws InterruptedException {
77+
public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits, final Block block) throws InterruptedException {
7778
int permitsInt = rubyFixnumToPositiveInt(permits, "permits");
7879
boolean acquired = semaphore.tryAcquire(permitsInt);
7980

80-
return triedAcquire(context, acquired);
81+
return triedAcquire(context, permitsInt, acquired, block);
8182
}
8283

8384
@JRubyMethod(name = "try_acquire")
84-
public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits, IRubyObject timeout) throws InterruptedException {
85+
public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits, IRubyObject timeout, final Block block) throws InterruptedException {
8586
int permitsInt = rubyFixnumToPositiveInt(permits, "permits");
8687
boolean acquired = semaphore.tryAcquire(
8788
permitsInt,
8889
rubyNumericToLong(timeout, "timeout"),
89-
java.util.concurrent.TimeUnit.SECONDS,
90+
java.util.concurrent.TimeUnit.SECONDS
9091
);
9192

92-
return triedAcquire(context, acquired);
93+
return triedAcquire(context, permitsInt, acquired, block);
9394
}
9495

9596
@JRubyMethod
@@ -110,14 +111,27 @@ public IRubyObject reducePermits(ThreadContext context, IRubyObject reduction) t
110111
return context.nil;
111112
}
112113

113-
private IRubyObject acquire(ThreadContext context, int permits) throws InterruptedException {
114+
private IRubyObject acquire(ThreadContext context, int permits, final Block block) throws InterruptedException {
114115
this.semaphore.acquire(permits);
115116

116-
return context.nil;
117+
if (!block.isGiven()) return context.nil;
118+
119+
try {
120+
return block.yieldSpecific(context);
121+
} finally {
122+
this.semaphore.release(permits);
123+
}
117124
}
118125

119-
private IRubyObject triedAcquire(ThreadContext context, int permits, boolean acquired) {
120-
return getRuntime().newBoolean(acquired);
126+
private IRubyObject triedAcquire(ThreadContext context, int permits, boolean acquired, final Block block) {
127+
if (!block.isGiven()) return getRuntime().newBoolean(acquired);
128+
if (!acquired) return context.nil;
129+
130+
try {
131+
return block.yieldSpecific(context);
132+
} finally {
133+
this.semaphore.release(permits);
134+
}
121135
}
122136

123137
private int rubyFixnumInt(IRubyObject value, String paramName) {

lib/concurrent-ruby/concurrent/atomic/mutex_semaphore.rb

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,13 @@ def acquire(permits = 1)
2525
try_acquire_timed(permits, nil)
2626
end
2727

28-
nil
28+
return unless block_given?
29+
30+
begin
31+
yield
32+
ensure
33+
release(permits)
34+
end
2935
end
3036

3137
# @!macro semaphore_method_available_permits
@@ -49,13 +55,22 @@ def try_acquire(permits = 1, timeout = nil)
4955
Utility::NativeInteger.ensure_integer_and_bounds permits
5056
Utility::NativeInteger.ensure_positive permits
5157

52-
synchronize do
58+
acquired = synchronize do
5359
if timeout.nil?
5460
try_acquire_now(permits)
5561
else
5662
try_acquire_timed(permits, timeout)
5763
end
5864
end
65+
66+
return acquired unless block_given?
67+
return unless acquired
68+
69+
begin
70+
yield
71+
ensure
72+
release(permits)
73+
end
5974
end
6075

6176
# @!macro semaphore_method_release

lib/concurrent-ruby/concurrent/atomic/semaphore.rb

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@ module Concurrent
1616
# @!macro semaphore_method_acquire
1717
#
1818
# Acquires the given number of permits from this semaphore,
19-
# blocking until all are available.
19+
# blocking until all are available. If a block is given,
20+
# yields to it and releases the permits afterwards.
2021
#
2122
# @param [Fixnum] permits Number of permits to acquire
2223
#
2324
# @raise [ArgumentError] if `permits` is not an integer or is less than
2425
# one
2526
#
26-
# @return [nil]
27+
# @return [nil, BasicObject] Without a block, `nil` is returned. If a block
28+
# is given, its return value is returned.
2729

2830
# @!macro semaphore_method_available_permits
2931
#
@@ -41,7 +43,9 @@ module Concurrent
4143
#
4244
# Acquires the given number of permits from this semaphore,
4345
# only if all are available at the time of invocation or within
44-
# `timeout` interval
46+
# `timeout` interval. If a block is given, yields to it if the permits
47+
# were successfully acquired, and releases them afterward, returning the
48+
# block's return value.
4549
#
4650
# @param [Fixnum] permits the number of permits to acquire
4751
#
@@ -51,8 +55,10 @@ module Concurrent
5155
# @raise [ArgumentError] if `permits` is not an integer or is less than
5256
# one
5357
#
54-
# @return [Boolean] `false` if no permits are available, `true` when
55-
# acquired a permit
58+
# @return [true, false, nil, BasicObject] `false` if no permits are
59+
# available, `true` when acquired a permit. If a block is given, the
60+
# block's return value is returned if the permits were acquired; if not,
61+
# `nil` is returned.
5662

5763
# @!macro semaphore_method_release
5864
#
@@ -106,6 +112,8 @@ module Concurrent
106112
# releasing a blocking acquirer.
107113
# However, no actual permit objects are used; the Semaphore just keeps a
108114
# count of the number available and acts accordingly.
115+
# Alternatively, permits may be acquired within a block, and automatically
116+
# released after the block finishes executing.
109117
#
110118
# @!macro semaphore_public_api
111119
# @example
@@ -140,6 +148,19 @@ module Concurrent
140148
# # Thread 4 releasing semaphore
141149
# # Thread 1 acquired semaphore
142150
#
151+
# @example
152+
# semaphore = Concurrent::Semaphore.new(1)
153+
#
154+
# puts semaphore.available_permits
155+
# semaphore.acquire do
156+
# puts semaphore.available_permits
157+
# end
158+
# puts semaphore.available_permits
159+
#
160+
# # prints:
161+
# # 1
162+
# # 0
163+
# # 1
143164
class Semaphore < SemaphoreImplementation
144165
end
145166
end

0 commit comments

Comments
 (0)