Skip to content

Commit 6a5d9bc

Browse files
committed
Merge pull request #238 from ruby-concurrency/semaphore-pure-java
Implementation of Concurrent::JavaSemaphore in pure Java.
2 parents 218d434 + 30fbf4b commit 6a5d9bc

File tree

3 files changed

+148
-64
lines changed

3 files changed

+148
-64
lines changed

ext/ConcurrentRubyExtService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ public boolean basicLoad(final Ruby runtime) throws IOException {
88
new com.concurrent_ruby.ext.AtomicReferenceLibrary().load(runtime, false);
99
new com.concurrent_ruby.ext.JavaAtomicBooleanLibrary().load(runtime, false);
1010
new com.concurrent_ruby.ext.JavaAtomicFixnumLibrary().load(runtime, false);
11+
new com.concurrent_ruby.ext.JavaSemaphoreLibrary().load(runtime, false);
1112
return true;
1213
}
1314
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package com.concurrent_ruby.ext;
2+
3+
import java.io.IOException;
4+
import java.util.concurrent.Semaphore;
5+
import org.jruby.Ruby;
6+
import org.jruby.RubyBoolean;
7+
import org.jruby.RubyClass;
8+
import org.jruby.RubyFixnum;
9+
import org.jruby.RubyModule;
10+
import org.jruby.RubyNumeric;
11+
import org.jruby.RubyObject;
12+
import org.jruby.anno.JRubyClass;
13+
import org.jruby.anno.JRubyMethod;
14+
import org.jruby.runtime.ObjectAllocator;
15+
import org.jruby.runtime.ThreadContext;
16+
import org.jruby.runtime.builtin.IRubyObject;
17+
18+
public class JavaSemaphoreLibrary {
19+
20+
public void load(Ruby runtime, boolean wrap) throws IOException {
21+
RubyModule concurrentMod = runtime.defineModule("Concurrent");
22+
RubyClass atomicCls = concurrentMod.defineClassUnder("JavaSemaphore", runtime.getObject(), JRUBYREFERENCE_ALLOCATOR);
23+
24+
atomicCls.defineAnnotatedMethods(JavaSemaphore.class);
25+
26+
}
27+
28+
private static final ObjectAllocator JRUBYREFERENCE_ALLOCATOR = new ObjectAllocator() {
29+
public IRubyObject allocate(Ruby runtime, RubyClass klazz) {
30+
return new JavaSemaphore(runtime, klazz);
31+
}
32+
};
33+
34+
@JRubyClass(name = "JavaSemaphore", parent = "Object")
35+
public static class JavaSemaphore extends RubyObject {
36+
37+
private JRubySemaphore semaphore;
38+
39+
public JavaSemaphore(Ruby runtime, RubyClass metaClass) {
40+
super(runtime, metaClass);
41+
}
42+
43+
@JRubyMethod
44+
public IRubyObject initialize(ThreadContext context, IRubyObject value) {
45+
this.semaphore = new JRubySemaphore(rubyFixnumToInt(value, "count"));
46+
return context.nil;
47+
}
48+
49+
@JRubyMethod
50+
public IRubyObject acquire(ThreadContext context, IRubyObject value) throws InterruptedException {
51+
this.semaphore.acquire(rubyFixnumToInt(value, "permits"));
52+
return context.nil;
53+
}
54+
55+
@JRubyMethod(name = "available_permits")
56+
public IRubyObject availablePermits(ThreadContext context) {
57+
return new RubyFixnum(getRuntime(), this.semaphore.availablePermits());
58+
}
59+
60+
@JRubyMethod(name = "drain_permits")
61+
public IRubyObject drainPermits(ThreadContext context) {
62+
return new RubyFixnum(getRuntime(), this.semaphore.drainPermits());
63+
}
64+
65+
@JRubyMethod
66+
public IRubyObject acquire(ThreadContext context) throws InterruptedException {
67+
this.semaphore.acquire(1);
68+
return context.nil;
69+
}
70+
71+
@JRubyMethod(name = "try_acquire")
72+
public IRubyObject tryAcquire(ThreadContext context) throws InterruptedException {
73+
return RubyBoolean.newBoolean(getRuntime(), semaphore.tryAcquire(1));
74+
}
75+
76+
@JRubyMethod(name = "try_acquire")
77+
public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits) throws InterruptedException {
78+
return RubyBoolean.newBoolean(getRuntime(), semaphore.tryAcquire(rubyFixnumToInt(permits, "permits")));
79+
}
80+
81+
@JRubyMethod(name = "try_acquire")
82+
public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits, IRubyObject timeout) throws InterruptedException {
83+
return RubyBoolean.newBoolean(getRuntime(),
84+
semaphore.tryAcquire(
85+
rubyFixnumToInt(permits, "permits"),
86+
rubyNumericToLong(timeout, "timeout"),
87+
java.util.concurrent.TimeUnit.SECONDS)
88+
);
89+
}
90+
91+
@JRubyMethod
92+
public IRubyObject release(ThreadContext context) {
93+
this.semaphore.release(1);
94+
return RubyBoolean.newBoolean(getRuntime(), true);
95+
}
96+
97+
@JRubyMethod
98+
public IRubyObject release(ThreadContext context, IRubyObject value) {
99+
this.semaphore.release(rubyFixnumToInt(value, "permits"));
100+
return RubyBoolean.newBoolean(getRuntime(), true);
101+
}
102+
103+
@JRubyMethod(name = "reduce_permits")
104+
public IRubyObject reducePermits(ThreadContext context, IRubyObject reduction) throws InterruptedException {
105+
this.semaphore.publicReducePermits(rubyFixnumToInt(reduction, "reduction"));
106+
return context.nil;
107+
}
108+
109+
private int rubyFixnumToInt(IRubyObject value, String paramName) {
110+
if (value instanceof RubyFixnum && ((RubyFixnum) value).getLongValue() > 0) {
111+
RubyFixnum fixNum = (RubyFixnum) value;
112+
return (int) fixNum.getLongValue();
113+
} else {
114+
throw getRuntime().newArgumentError(paramName + " must be in integer greater than zero");
115+
}
116+
}
117+
118+
private long rubyNumericToLong(IRubyObject value, String paramName) {
119+
if (value instanceof RubyNumeric && ((RubyNumeric) value).getDoubleValue() > 0) {
120+
RubyNumeric fixNum = (RubyNumeric) value;
121+
return fixNum.getLongValue();
122+
} else {
123+
throw getRuntime().newArgumentError(paramName + " must be in float greater than zero");
124+
}
125+
}
126+
127+
class JRubySemaphore extends Semaphore {
128+
129+
public JRubySemaphore(int permits) {
130+
super(permits);
131+
}
132+
133+
public JRubySemaphore(int permits, boolean value) {
134+
super(permits, value);
135+
}
136+
137+
public void publicReducePermits(int i) {
138+
reducePermits(i);
139+
}
140+
141+
}
142+
}
143+
}
144+

lib/concurrent/atomic/semaphore.rb

Lines changed: 3 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ def release(permits = 1)
109109
end
110110

111111
# @!macro [attach] semaphore_method_reduce_permits
112-
#
112+
#
113113
# @api private
114114
#
115115
# Shrinks the number of available permits by the indicated reduction.
@@ -126,7 +126,7 @@ def reduce_permits(reduction)
126126
fail ArgumentError, 'reduction must be an non-negative integer'
127127
end
128128
@mutex.synchronize { @free -= reduction }
129-
nil
129+
nil
130130
end
131131

132132
private
@@ -153,73 +153,12 @@ def try_acquire_timed(permits, timeout)
153153
if RUBY_PLATFORM == 'java'
154154

155155
# @!macro semaphore
156-
#
156+
#
157157
# A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each {#acquire} blocks if necessary
158158
# until a permit is available, and then takes it. Each {#release} adds a permit,
159159
# potentially releasing a blocking acquirer.
160160
# However, no actual permit objects are used; the Semaphore just keeps a count of the number available and
161161
# acts accordingly.
162-
class JavaSemaphore
163-
# @!macro semaphore_method_initialize
164-
def initialize(count)
165-
unless count.is_a?(Fixnum) && count >= 0
166-
fail(ArgumentError,
167-
'count must be in integer greater than or equal zero')
168-
end
169-
@semaphore = java.util.concurrent.Semaphore.new(count)
170-
end
171-
172-
# @!macro semaphore_method_acquire
173-
def acquire(permits = 1)
174-
unless permits.is_a?(Fixnum) && permits > 0
175-
fail ArgumentError, 'permits must be an integer greater than zero'
176-
end
177-
@semaphore.acquire(permits)
178-
end
179-
180-
# @!macro semaphore_method_available_permits
181-
def available_permits
182-
@semaphore.availablePermits
183-
end
184-
185-
# @!macro semaphore_method_drain_permits
186-
def drain_permits
187-
@semaphore.drainPermits
188-
end
189-
190-
# @!macro semaphore_method_try_acquire
191-
def try_acquire(permits = 1, timeout = nil)
192-
unless permits.is_a?(Fixnum) && permits > 0
193-
fail ArgumentError, 'permits must be an integer greater than zero'
194-
end
195-
if timeout.nil?
196-
@semaphore.tryAcquire(permits)
197-
else
198-
@semaphore.tryAcquire(permits,
199-
timeout,
200-
java.util.concurrent.TimeUnit::SECONDS)
201-
end
202-
end
203-
204-
# @!macro semaphore_method_release
205-
def release(permits = 1)
206-
unless permits.is_a?(Fixnum) && permits > 0
207-
fail ArgumentError, 'permits must be an integer greater than zero'
208-
end
209-
@semaphore.release(permits)
210-
true
211-
end
212-
213-
# @!macro semaphore_method_reduce_permits
214-
def reduce_permits(reduction)
215-
unless reduction.is_a?(Fixnum) && reduction >= 0
216-
fail ArgumentError, 'reduction must be an non-negative integer'
217-
end
218-
@semaphore.reducePermits(reduction)
219-
end
220-
end
221-
222-
# @!macro semaphore
223162
class Semaphore < JavaSemaphore
224163
end
225164

0 commit comments

Comments
 (0)