Skip to content

Commit b9b2316

Browse files
committed
Inline AtomicReference, delete unused class
1 parent 95db46c commit b9b2316

File tree

2 files changed

+14
-36
lines changed

2 files changed

+14
-36
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ protected void subscribeActual(Observer<? super T> t) {
9999
* @return true if the cache has observers
100100
*/
101101
/* public */ boolean hasObservers() {
102-
return multicaster.observers.get().length != 0;
102+
return multicaster.get().length != 0;
103103
}
104104

105105
/**
@@ -110,18 +110,13 @@ protected void subscribeActual(Observer<? super T> t) {
110110
return multicaster.size;
111111
}
112112

113-
static final class Multicaster<T> implements Observer<T> {
113+
static final class Multicaster<T> extends AtomicReference<CacheDisposable<T>[]> implements Observer<T> {
114114

115115
/**
116116
* The number of items per cached nodes.
117117
*/
118118
final int capacityHint;
119119

120-
/**
121-
* The current known array of observer state to notify.
122-
*/
123-
final AtomicReference<CacheDisposable<T>[]> observers;
124-
125120
/**
126121
* The total number of elements in the list available for reads.
127122
*/
@@ -138,7 +133,7 @@ static final class Multicaster<T> implements Observer<T> {
138133
int tailOffset;
139134

140135
/**
141-
* If {@link #observers} is {@link #TERMINATED}, this holds the terminal error if not null.
136+
* If the observers are {@link #TERMINATED}, this holds the terminal error if not null.
142137
*/
143138
Throwable error;
144139

@@ -149,19 +144,19 @@ static final class Multicaster<T> implements Observer<T> {
149144

150145
@SuppressWarnings("unchecked")
151146
Multicaster(int capacityHint, final Node<T> head) {
152-
this.capacityHint = capacityHint;
147+
super(EMPTY);
153148
this.tail = head;
154-
this.observers = new AtomicReference<>(EMPTY);
149+
this.capacityHint = capacityHint;
155150
}
156151

157152
/**
158-
* Atomically adds the consumer to the {@link #observers} copy-on-write array
153+
* Atomically adds the consumer to the observers copy-on-write array
159154
* if the source has not yet terminated.
160155
* @param consumer the consumer to add
161156
*/
162157
void add(CacheDisposable<T> consumer) {
163158
for (;;) {
164-
CacheDisposable<T>[] current = observers.get();
159+
CacheDisposable<T>[] current = get();
165160
if (current == TERMINATED) {
166161
return;
167162
}
@@ -172,20 +167,20 @@ void add(CacheDisposable<T> consumer) {
172167
System.arraycopy(current, 0, next, 0, n);
173168
next[n] = consumer;
174169

175-
if (observers.compareAndSet(current, next)) {
170+
if (compareAndSet(current, next)) {
176171
return;
177172
}
178173
}
179174
}
180175

181176
/**
182-
* Atomically removes the consumer from the {@link #observers} copy-on-write array.
177+
* Atomically removes the consumer from the observers copy-on-write array.
183178
* @param consumer the consumer to remove
184179
*/
185180
@SuppressWarnings("unchecked")
186181
void remove(CacheDisposable<T> consumer) {
187182
for (;;) {
188-
CacheDisposable<T>[] current = observers.get();
183+
CacheDisposable<T>[] current = get();
189184
int n = current.length;
190185
if (n == 0) {
191186
return;
@@ -212,7 +207,7 @@ void remove(CacheDisposable<T> consumer) {
212207
System.arraycopy(current, j + 1, next, j, n - j - 1);
213208
}
214209

215-
if (observers.compareAndSet(current, next)) {
210+
if (compareAndSet(current, next)) {
216211
return;
217212
}
218213
}
@@ -318,7 +313,7 @@ public void onNext(T t) {
318313
this.tailOffset = tailOffset + 1;
319314
}
320315
size++;
321-
for (CacheDisposable<T> consumer : observers.get()) {
316+
for (CacheDisposable<T> consumer : get()) {
322317
replay(consumer);
323318
}
324319
}
@@ -330,7 +325,7 @@ public void onError(Throwable t) {
330325
done = true;
331326
// No additional events will arrive, so now we can clear the 'tail' reference
332327
tail = null;
333-
for (CacheDisposable<T> consumer : observers.getAndSet(TERMINATED)) {
328+
for (CacheDisposable<T> consumer : getAndSet(TERMINATED)) {
334329
replay(consumer);
335330
}
336331
}
@@ -341,7 +336,7 @@ public void onComplete() {
341336
done = true;
342337
// No additional events will arrive, so now we can clear the 'tail' reference
343338
tail = null;
344-
for (CacheDisposable<T> consumer : observers.getAndSet(TERMINATED)) {
339+
for (CacheDisposable<T> consumer : getAndSet(TERMINATED)) {
345340
replay(consumer);
346341
}
347342
}

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -408,21 +408,4 @@ public void accept(byte[] v) throws Exception {
408408
+ " -> " + after.get() / 1024.0 / 1024.0);
409409
}
410410
}
411-
412-
static final class Payload {
413-
private final int value;
414-
415-
Payload(int value) {
416-
this.value = value;
417-
}
418-
419-
int value() {
420-
return value;
421-
}
422-
423-
@Override
424-
public String toString() {
425-
return "Payload(" + value + ")";
426-
}
427-
}
428411
}

0 commit comments

Comments
 (0)