Skip to content

Commit 7be7455

Browse files
committed
Merge pull request #357 from ruby-concurrency/synchronization
Synchronisation fixes
2 parents 1d31751 + ee9b7ed commit 7be7455

File tree

8 files changed

+134
-97
lines changed

8 files changed

+134
-97
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
MRI 1.9.3, 2.0, 2.1, 2.2, JRuby (1.9 mode), and Rubinius 2.x are supported.
4141
This gem should be fully compatible with any interpreter that is compliant with Ruby 1.9.3 or newer.
42+
Java 8 is required for JRuby (Java 7 support is deprecated in version 0.9 and will be removed in 1.0).
4243

4344
## Features & Documentation
4445

doc/synchronization.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,14 @@ We are interested in following behaviors:
139139

140140
### Variables
141141

142-
- **Local variables** - atomic assignment, non-volatile.
142+
- **Local variables** - atomic assignment (only Integer and Object), non-volatile.
143143
- Consequence: a lambda defined on `thread1` executing on `thread2` may not see updated values in local variables captured in its closure.
144144
- Reason: local variables are non-volatile on Jruby and Rubinius.
145-
- **Instance variables** - atomic assignment, non-volatile.
145+
- **Instance variables** - atomic assignment (only Integer and Object), non-volatile.
146146
- Consequence: Different thread may see old values; different thread may see not fully-initialized object.
147147
- Reason: local variables are non-volatile on Jruby and Rubinius.
148148
- **Constants** - atomic assignment, volatile.
149+
- **Assignments of Float** may not be atomic (some implementations (e.g. Truffle) may use native double).
149150

150151
Other:
151152

ext/com/concurrent_ruby/ext/SynchronizationLibrary.java

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@
2121

2222
public class SynchronizationLibrary implements Library {
2323

24+
private static final ObjectAllocator JRUBYREFERENCE_ALLOCATOR = new ObjectAllocator() {
25+
public IRubyObject allocate(Ruby runtime, RubyClass klazz) {
26+
return new JavaObject(runtime, klazz);
27+
}
28+
};
29+
2430
public void load(Ruby runtime, boolean wrap) throws IOException {
2531
RubyModule synchronizationModule = runtime.
2632
defineModule("Concurrent").
@@ -36,15 +42,13 @@ public void load(Ruby runtime, boolean wrap) throws IOException {
3642
synchronizedObjectJavaClass.defineAnnotatedMethods(JavaObject.class);
3743
}
3844

39-
private static final ObjectAllocator JRUBYREFERENCE_ALLOCATOR = new ObjectAllocator() {
40-
public IRubyObject allocate(Ruby runtime, RubyClass klazz) {
41-
return new JavaObject(runtime, klazz);
42-
}
43-
};
44-
4545
@JRubyClass(name = "JavaObject", parent = "AbstractObject")
4646
public static class JavaObject extends RubyObject {
4747

48+
public static final long AN_VOLATILE_FIELD_OFFSET =
49+
UnsafeHolder.fieldOffset(JavaObject.class, "anVolatileField");
50+
private volatile int anVolatileField = 0;
51+
4852
public JavaObject(Ruby runtime, RubyClass metaClass) {
4953
super(runtime, metaClass);
5054
}
@@ -107,24 +111,30 @@ public IRubyObject nsBroadcast(ThreadContext context) {
107111

108112
@JRubyMethod(name = "ensure_ivar_visibility!", visibility = Visibility.PROTECTED)
109113
public IRubyObject ensureIvarVisibilityBang(ThreadContext context) {
110-
if (UnsafeHolder.SUPPORTS_FENCES)
111-
UnsafeHolder.storeFence();
112-
else
113-
anVolatileField = 1;
114+
if (UnsafeHolder.U == null) {
115+
// We are screwed
116+
throw new UnsupportedOperationException();
117+
} else if (UnsafeHolder.SUPPORTS_FENCES)
118+
// We have to prevent ivar writes to reordered with storing of the final instance reference
119+
// Therefore wee need a fullFence to prevent reordering in both directions.
120+
UnsafeHolder.fullFence();
121+
else {
122+
// Assumption that this is not eliminated, if false it will break non x86 platforms.
123+
UnsafeHolder.U.putIntVolatile(this, AN_VOLATILE_FIELD_OFFSET, 1);
124+
UnsafeHolder.U.getIntVolatile(this, AN_VOLATILE_FIELD_OFFSET);
125+
}
114126
return context.nil;
115127
}
116128

117-
private volatile int anVolatileField = 0; // TODO unused on JAVA8
118-
public static final long AN_VOLATILE_FIELD_OFFSET =
119-
UnsafeHolder.fieldOffset(JavaObject.class, "anVolatileField");
120-
121129
@JRubyMethod(name = "instance_variable_get_volatile", visibility = Visibility.PROTECTED)
122130
public IRubyObject instanceVariableGetVolatile(ThreadContext context, IRubyObject name) {
123131
if (UnsafeHolder.U == null) {
132+
// TODO: Possibly dangerous, there may be a deadlock on the this
124133
synchronized (this) {
125134
return instance_variable_get(context, name);
126135
}
127136
} else if (UnsafeHolder.SUPPORTS_FENCES) {
137+
// ensure we see latest value
128138
UnsafeHolder.loadFence();
129139
return instance_variable_get(context, name);
130140
} else {
@@ -136,16 +146,18 @@ public IRubyObject instanceVariableGetVolatile(ThreadContext context, IRubyObjec
136146
@JRubyMethod(name = "instance_variable_set_volatile", visibility = Visibility.PROTECTED)
137147
public IRubyObject InstanceVariableSetVolatile(ThreadContext context, IRubyObject name, IRubyObject value) {
138148
if (UnsafeHolder.U == null) {
149+
// TODO: Possibly dangerous, there may be a deadlock on the this
139150
synchronized (this) {
140151
return instance_variable_set(name, value);
141152
}
142153
} else if (UnsafeHolder.SUPPORTS_FENCES) {
143-
IRubyObject result = instance_variable_set(name, value);
154+
final IRubyObject result = instance_variable_set(name, value);
155+
// ensure we make latest value visible
144156
UnsafeHolder.storeFence();
145157
return result;
146158
} else {
159+
final IRubyObject result = instance_variable_set(name, value);
147160
UnsafeHolder.U.putIntVolatile(this, AN_VOLATILE_FIELD_OFFSET, 1);
148-
IRubyObject result = instance_variable_set(name, value);
149161
return result;
150162
}
151163
}

lib/concurrent/concern/deprecation.rb

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,25 @@ module Deprecation
1010
include Concern::Logging
1111

1212
def deprecated(message, strip = 2)
13-
caller_line = caller(strip).first
14-
klass = if Class === self
13+
caller_line = caller(strip).first if strip > 0
14+
klass = if Module === self
1515
self
1616
else
1717
self.class
1818
end
19-
log WARN, klass.to_s, format("[DEPRECATED] %s\ncalled on: %s", message, caller_line)
19+
message = if strip > 0
20+
format("[DEPRECATED] %s\ncalled on: %s", message, caller_line)
21+
else
22+
format('[DEPRECATED] %s', message)
23+
end
24+
log WARN, klass.to_s, message
2025
end
2126

2227
def deprecated_method(old_name, new_name)
2328
deprecated "`#{old_name}` is deprecated and it'll removed in next release, use `#{new_name}` instead", 3
2429
end
30+
31+
extend self
2532
end
2633
end
2734
end

lib/concurrent/concern/logging.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
require 'logger'
2+
require 'concurrent/configuration'
23

34
module Concurrent
45
module Concern

lib/concurrent/configuration.rb

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -11,37 +11,6 @@ module Concurrent
1111
extend Concern::Logging
1212
extend Concern::Deprecation
1313

14-
# Suppresses all output when used for logging.
15-
NULL_LOGGER = lambda { |level, progname, message = nil, &block| }
16-
17-
# @!visibility private
18-
GLOBAL_LOGGER = AtomicReference.new(NULL_LOGGER)
19-
private_constant :GLOBAL_LOGGER
20-
21-
# @!visibility private
22-
GLOBAL_FAST_EXECUTOR = Delay.new { Concurrent.new_fast_executor(auto_terminate: true) }
23-
private_constant :GLOBAL_FAST_EXECUTOR
24-
25-
# @!visibility private
26-
GLOBAL_IO_EXECUTOR = Delay.new { Concurrent.new_io_executor(auto_terminate: true) }
27-
private_constant :GLOBAL_IO_EXECUTOR
28-
29-
# @!visibility private
30-
GLOBAL_TIMER_SET = Delay.new { TimerSet.new(auto_terminate: true) }
31-
private_constant :GLOBAL_TIMER_SET
32-
33-
# @!visibility private
34-
GLOBAL_IMMEDIATE_EXECUTOR = ImmediateExecutor.new
35-
private_constant :GLOBAL_IMMEDIATE_EXECUTOR
36-
37-
def self.global_logger
38-
GLOBAL_LOGGER.value
39-
end
40-
41-
def self.global_logger=(value)
42-
GLOBAL_LOGGER.value = value
43-
end
44-
4514
# @return [Logger] Logger with provided level and output.
4615
def self.create_stdlib_logger(level = Logger::FATAL, output = $stderr)
4716
logger = Logger.new(output)
@@ -62,17 +31,48 @@ def self.create_stdlib_logger(level = Logger::FATAL, output = $stderr)
6231
progname,
6332
formatted_message
6433
end
65-
logger
34+
35+
lambda do |level, progname, message = nil, &block|
36+
logger.add level, message, progname, &block
37+
end
6638
end
6739

6840
# Use logger created by #create_stdlib_logger to log concurrent-ruby messages.
6941
def self.use_stdlib_logger(level = Logger::FATAL, output = $stderr)
70-
logger = create_stdlib_logger level, output
71-
Concurrent.global_logger = lambda do |level, progname, message = nil, &block|
72-
logger.add level, message, progname, &block
73-
end
42+
Concurrent.global_logger = create_stdlib_logger level, output
7443
end
7544

45+
# Suppresses all output when used for logging.
46+
NULL_LOGGER = lambda { |level, progname, message = nil, &block| }
47+
48+
# @!visibility private
49+
GLOBAL_LOGGER = AtomicReference.new(create_stdlib_logger(Logger::WARN))
50+
private_constant :GLOBAL_LOGGER
51+
52+
def self.global_logger
53+
GLOBAL_LOGGER.value
54+
end
55+
56+
def self.global_logger=(value)
57+
GLOBAL_LOGGER.value = value
58+
end
59+
60+
# @!visibility private
61+
GLOBAL_FAST_EXECUTOR = Delay.new { Concurrent.new_fast_executor(auto_terminate: true) }
62+
private_constant :GLOBAL_FAST_EXECUTOR
63+
64+
# @!visibility private
65+
GLOBAL_IO_EXECUTOR = Delay.new { Concurrent.new_io_executor(auto_terminate: true) }
66+
private_constant :GLOBAL_IO_EXECUTOR
67+
68+
# @!visibility private
69+
GLOBAL_TIMER_SET = Delay.new { TimerSet.new(auto_terminate: true) }
70+
private_constant :GLOBAL_TIMER_SET
71+
72+
# @!visibility private
73+
GLOBAL_IMMEDIATE_EXECUTOR = ImmediateExecutor.new
74+
private_constant :GLOBAL_IMMEDIATE_EXECUTOR
75+
7676
# Disables AtExit handlers including pool auto-termination handlers.
7777
# When disabled it will be the application programmer's responsibility
7878
# to ensure that the handlers are shutdown properly prior to application
@@ -275,4 +275,18 @@ def self.configuration
275275
def self.configure
276276
yield(configuration)
277277
end
278+
279+
# for dependency reasons this check cannot be in concurrent/synchronization
280+
if Concurrent.on_jruby?
281+
require 'java'
282+
283+
version_string = java.lang.System.getProperties['java.runtime.version']
284+
version = version_string.split('.', 3)[0..1].map(&:to_i)
285+
if (version <=> [1, 8]) < 0
286+
deprecated <<-TXT.gsub(/^\s*\|/, '').chop, 0
287+
|Java 7 is deprecated, please use Java 8.
288+
|Java 7 support is only best effort, it may not work. It will be removed in next release (1.0).
289+
TXT
290+
end
291+
end
278292
end

0 commit comments

Comments
 (0)