Skip to content

Commit ccbd483

Browse files
akarnokdakarnokd
authored andcommitted
Changed lazySet to regular volatile write to avoid potential visibility issues.
1 parent 8b8c721 commit ccbd483

14 files changed

+45
-60
lines changed

rxjava-core/src/main/java/rx/operators/BlockingOperatorMostRecent.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package rx.operators;
1717

1818
import java.util.Iterator;
19-
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2019

2120
import rx.Observable;
2221
import rx.Subscriber;
@@ -80,27 +79,24 @@ public void remove() {
8079
private static class MostRecentObserver<T> extends Subscriber<T> {
8180
static final NotificationLite<Object> nl = NotificationLite.instance();
8281
volatile Object value;
83-
@SuppressWarnings("rawtypes")
84-
static final AtomicReferenceFieldUpdater<MostRecentObserver, Object> VALUE_UPDATER
85-
= AtomicReferenceFieldUpdater.newUpdater(MostRecentObserver.class, Object.class, "value");
8682

8783
private MostRecentObserver(T value) {
88-
VALUE_UPDATER.lazySet(this, nl.next(value));
84+
this.value = nl.next(value);
8985
}
9086

9187
@Override
9288
public void onCompleted() {
93-
VALUE_UPDATER.lazySet(this, nl.completed());
89+
value = nl.completed();
9490
}
9591

9692
@Override
9793
public void onError(Throwable e) {
98-
VALUE_UPDATER.lazySet(this, nl.error(e));
94+
value = nl.error(e);
9995
}
10096

10197
@Override
10298
public void onNext(T args) {
103-
VALUE_UPDATER.lazySet(this, nl.next(args));
99+
value = nl.next(args);
104100
}
105101

106102
private boolean isCompleted() {

rxjava-core/src/main/java/rx/operators/BlockingOperatorNext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ public Notification<? extends T> takeNext() throws InterruptedException {
178178
return buf.take();
179179
}
180180
void setWaiting(int value) {
181-
WAITING_UPDATER.lazySet(this, value);
181+
waiting = value;
182182
}
183183
}
184184
}

rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ boolean casFirst(int expected, int next) {
7070
return FIRST_UPDATER.compareAndSet(this, expected, next);
7171
}
7272
void setObserverRef(Observer<? super T> o) {
73-
OBSERVER_UPDATER.lazySet(this, o);
73+
observerRef = o;
7474
}
7575
boolean casObserverRef(Observer<? super T> expected, Observer<? super T> next) {
7676
return OBSERVER_UPDATER.compareAndSet(this, expected, next);

rxjava-core/src/main/java/rx/operators/OperatorMerge.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public MergeSubscriber(Subscriber<T> actual, CompositeSubscription childrenSubsc
5757
super(actual);
5858
this.actual = actual;
5959
this.childrenSubscriptions = childrenSubscriptions;
60-
WIP_UPDATER.lazySet(this, 1);
60+
this.wip = 1;
6161
}
6262

6363
@Override

rxjava-core/src/main/java/rx/operators/OperatorMergeDelayError.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public MergeDelayErrorSubscriber(Subscriber<? super T> s, CompositeSubscription
6767
super(s);
6868
this.s = s;
6969
this.csub = csub;
70-
WIP_UPDATER.lazySet(this, 1);
70+
this.wip = 1;
7171
}
7272

7373
@Override

rxjava-core/src/main/java/rx/operators/OperatorMergeMapPair.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public SourceSubscriber(Subscriber<R> s, CompositeSubscription csub,
7979
this.csub = csub;
8080
this.collectionSelector = collectionSelector;
8181
this.resultSelector = resultSelector;
82-
WIP_UPDATER.lazySet(this, 1);
82+
this.wip = 1;
8383
}
8484

8585
@Override

rxjava-core/src/main/java/rx/operators/OperatorMergeMapTransform.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public SourceSubscriber(Subscriber<R> s, CompositeSubscription csub, Func1<? sup
7171
this.onNext = onNext;
7272
this.onError = onError;
7373
this.onCompleted = onCompleted;
74-
WIP_UPDATER.lazySet(this, 1);
74+
this.wip = 1;
7575
}
7676

7777
@Override

rxjava-core/src/main/java/rx/operators/OperatorMergeMaxConcurrent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public SourceSubscriber(int maxConcurrency, Subscriber<T> s, CompositeSubscripti
7272
this.csub = csub;
7373
this.guard = new Object();
7474
this.queue = new LinkedList<Observable<? extends T>>();
75-
WIP_UPDATER.lazySet(this, 1);
75+
this.wip = 1;
7676
}
7777

7878
@Override

rxjava-core/src/main/java/rx/operators/OperatorSampleWithTime.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public SamplerSubscriber(Subscriber<? super T> subscriber) {
7272
}
7373
@Override
7474
public void onNext(T t) {
75-
VALUE_UPDATER.lazySet(this, t);
75+
value = t;
7676
}
7777

7878
@Override

rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package rx.plugins;
1717

18-
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18+
import java.util.concurrent.atomic.AtomicReference;
1919

2020
/**
2121
* Registry for plugin implementations that allows global override and handles the retrieval of correct implementation based on order of precedence:
@@ -29,17 +29,10 @@
2929
public class RxJavaPlugins {
3030
private final static RxJavaPlugins INSTANCE = new RxJavaPlugins();
3131

32-
volatile RxJavaErrorHandler errorHandler;
33-
volatile RxJavaObservableExecutionHook observableExecutionHook;
34-
volatile RxJavaDefaultSchedulers schedulerOverrides;
32+
private final AtomicReference<RxJavaErrorHandler> errorHandler = new AtomicReference<RxJavaErrorHandler>();
33+
private final AtomicReference<RxJavaObservableExecutionHook> observableExecutionHook = new AtomicReference<RxJavaObservableExecutionHook>();
34+
private final AtomicReference<RxJavaDefaultSchedulers> schedulerOverrides = new AtomicReference<RxJavaDefaultSchedulers>();
3535

36-
static final AtomicReferenceFieldUpdater<RxJavaPlugins, RxJavaErrorHandler> ERROR_HANDLER_UPDATER
37-
= AtomicReferenceFieldUpdater.newUpdater(RxJavaPlugins.class, RxJavaErrorHandler.class, "errorHandler");
38-
static final AtomicReferenceFieldUpdater<RxJavaPlugins, RxJavaObservableExecutionHook> EXECUTION_HOOK_UPDATER
39-
= AtomicReferenceFieldUpdater.newUpdater(RxJavaPlugins.class, RxJavaObservableExecutionHook.class, "observableExecutionHook");
40-
static final AtomicReferenceFieldUpdater<RxJavaPlugins, RxJavaDefaultSchedulers> SCHEDULER_OVERRIDE_UPDATER
41-
= AtomicReferenceFieldUpdater.newUpdater(RxJavaPlugins.class, RxJavaDefaultSchedulers.class, "schedulerOverrides");
42-
4336
public static RxJavaPlugins getInstance() {
4437
return INSTANCE;
4538
}
@@ -48,9 +41,9 @@ public static RxJavaPlugins getInstance() {
4841

4942
}
5043

51-
/* package accessible for unit tests */void reset() {
52-
ERROR_HANDLER_UPDATER.lazySet(this, null);
53-
EXECUTION_HOOK_UPDATER.lazySet(this, null);
44+
/* package accessible for ujnit tests */void reset() {
45+
INSTANCE.errorHandler.set(null);
46+
INSTANCE.observableExecutionHook.set(null);
5447
}
5548

5649
/**
@@ -62,19 +55,19 @@ public static RxJavaPlugins getInstance() {
6255
* @return {@link RxJavaErrorHandler} implementation to use
6356
*/
6457
public RxJavaErrorHandler getErrorHandler() {
65-
if (errorHandler == null) {
58+
if (errorHandler.get() == null) {
6659
// check for an implementation from System.getProperty first
6760
Object impl = getPluginImplementationViaProperty(RxJavaErrorHandler.class);
6861
if (impl == null) {
6962
// nothing set via properties so initialize with default
70-
ERROR_HANDLER_UPDATER.compareAndSet(this, null, RxJavaErrorHandlerDefault.getInstance());
63+
errorHandler.compareAndSet(null, RxJavaErrorHandlerDefault.getInstance());
7164
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
7265
} else {
7366
// we received an implementation from the system property so use it
74-
ERROR_HANDLER_UPDATER.compareAndSet(this, null, (RxJavaErrorHandler) impl);
67+
errorHandler.compareAndSet(null, (RxJavaErrorHandler) impl);
7568
}
7669
}
77-
return errorHandler;
70+
return errorHandler.get();
7871
}
7972

8073
/**
@@ -86,8 +79,8 @@ public RxJavaErrorHandler getErrorHandler() {
8679
* if called more than once or after the default was initialized (if usage occurs before trying to register)
8780
*/
8881
public void registerErrorHandler(RxJavaErrorHandler impl) {
89-
if (!ERROR_HANDLER_UPDATER.compareAndSet(this, null, impl)) {
90-
throw new IllegalStateException("Another strategy was already registered: " + errorHandler);
82+
if (!errorHandler.compareAndSet(null, impl)) {
83+
throw new IllegalStateException("Another strategy was already registered: " + errorHandler.get());
9184
}
9285
}
9386

@@ -100,19 +93,19 @@ public void registerErrorHandler(RxJavaErrorHandler impl) {
10093
* @return {@link RxJavaObservableExecutionHook} implementation to use
10194
*/
10295
public RxJavaObservableExecutionHook getObservableExecutionHook() {
103-
if (observableExecutionHook == null) {
96+
if (observableExecutionHook.get() == null) {
10497
// check for an implementation from System.getProperty first
10598
Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class);
10699
if (impl == null) {
107100
// nothing set via properties so initialize with default
108-
EXECUTION_HOOK_UPDATER.compareAndSet(this, null, RxJavaObservableExecutionHookDefault.getInstance());
101+
observableExecutionHook.compareAndSet(null, RxJavaObservableExecutionHookDefault.getInstance());
109102
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
110103
} else {
111104
// we received an implementation from the system property so use it
112-
EXECUTION_HOOK_UPDATER.compareAndSet(this, null, (RxJavaObservableExecutionHook) impl);
105+
observableExecutionHook.compareAndSet(null, (RxJavaObservableExecutionHook) impl);
113106
}
114107
}
115-
return observableExecutionHook;
108+
return observableExecutionHook.get();
116109
}
117110

118111
/**
@@ -124,8 +117,8 @@ public RxJavaObservableExecutionHook getObservableExecutionHook() {
124117
* if called more than once or after the default was initialized (if usage occurs before trying to register)
125118
*/
126119
public void registerObservableExecutionHook(RxJavaObservableExecutionHook impl) {
127-
if (!EXECUTION_HOOK_UPDATER.compareAndSet(this, null, impl)) {
128-
throw new IllegalStateException("Another strategy was already registered: " + observableExecutionHook);
120+
if (!observableExecutionHook.compareAndSet(null, impl)) {
121+
throw new IllegalStateException("Another strategy was already registered: " + observableExecutionHook.get());
129122
}
130123
}
131124

@@ -168,19 +161,19 @@ private static Object getPluginImplementationViaProperty(Class<?> pluginClass) {
168161
* @return {@link RxJavaErrorHandler} implementation to use
169162
*/
170163
public RxJavaDefaultSchedulers getDefaultSchedulers() {
171-
if (schedulerOverrides == null) {
164+
if (schedulerOverrides.get() == null) {
172165
// check for an implementation from System.getProperty first
173166
Object impl = getPluginImplementationViaProperty(RxJavaDefaultSchedulers.class);
174167
if (impl == null) {
175168
// nothing set via properties so initialize with default
176-
SCHEDULER_OVERRIDE_UPDATER.compareAndSet(this, null, RxJavaDefaultSchedulersDefault.getInstance());
169+
schedulerOverrides.compareAndSet(null, RxJavaDefaultSchedulersDefault.getInstance());
177170
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
178171
} else {
179172
// we received an implementation from the system property so use it
180-
SCHEDULER_OVERRIDE_UPDATER.compareAndSet(this, null, (RxJavaDefaultSchedulers) impl);
173+
schedulerOverrides.compareAndSet(null, (RxJavaDefaultSchedulers) impl);
181174
}
182175
}
183-
return schedulerOverrides;
176+
return schedulerOverrides.get();
184177
}
185178

186179
/**
@@ -192,8 +185,8 @@ public RxJavaDefaultSchedulers getDefaultSchedulers() {
192185
* if called more than once or after the default was initialized (if usage occurs before trying to register)
193186
*/
194187
public void registerDefaultSchedulers(RxJavaDefaultSchedulers impl) {
195-
if (!SCHEDULER_OVERRIDE_UPDATER.compareAndSet(this, null, impl)) {
196-
throw new IllegalStateException("Another strategy was already registered: " + schedulerOverrides);
188+
if (!schedulerOverrides.compareAndSet(null, impl)) {
189+
throw new IllegalStateException("Another strategy was already registered: " + schedulerOverrides.get());
197190
}
198191
}
199192
}

0 commit comments

Comments
 (0)