Skip to content

Commit 378cbf9

Browse files
authored
2.x: Cleanup code style, commas, spaces, docs (#6255)
1 parent 8654393 commit 378cbf9

File tree

62 files changed

+122
-101
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+122
-101
lines changed

src/jmh/java/io/reactivex/parallel/ParallelPerf.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
@BenchmarkMode(Mode.Throughput)
2929
@Warmup(iterations = 5)
3030
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
31-
@Fork(value = 1,jvmArgsAppend = { "-XX:MaxInlineLevel=20" })
31+
@Fork(value = 1, jvmArgsAppend = { "-XX:MaxInlineLevel=20" })
3232
@OutputTimeUnit(TimeUnit.SECONDS)
3333
@State(Scope.Thread)
3434
public class ParallelPerf implements Function<Integer, Integer> {

src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ public boolean isEmpty() {
430430
}
431431
}
432432

433-
static final class EvictionAction<K, V> implements Consumer<GroupedUnicast<K,V>> {
433+
static final class EvictionAction<K, V> implements Consumer<GroupedUnicast<K, V>> {
434434

435435
final Queue<GroupedUnicast<K, V>> evictedGroups;
436436

@@ -439,7 +439,7 @@ static final class EvictionAction<K, V> implements Consumer<GroupedUnicast<K,V>>
439439
}
440440

441441
@Override
442-
public void accept(GroupedUnicast<K,V> value) {
442+
public void accept(GroupedUnicast<K, V> value) {
443443
evictedGroups.offer(value);
444444
}
445445
}

src/main/java/io/reactivex/internal/queue/MpscLinkedQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public MpscLinkedQueue() {
3636
consumerNode = new AtomicReference<LinkedQueueNode<T>>();
3737
LinkedQueueNode<T> node = new LinkedQueueNode<T>();
3838
spConsumerNode(node);
39-
xchgProducerNode(node);// this ensures correct construction: StoreLoad
39+
xchgProducerNode(node); // this ensures correct construction: StoreLoad
4040
}
4141

4242
/**

src/main/java/io/reactivex/internal/queue/SpscArrayQueue.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,12 @@ public E poll() {
8989
final long index = consumerIndex.get();
9090
final int offset = calcElementOffset(index);
9191
// local load of field to avoid repeated loads after volatile reads
92-
final E e = lvElement(offset);// LoadLoad
92+
final E e = lvElement(offset); // LoadLoad
9393
if (null == e) {
9494
return null;
9595
}
9696
soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size()
97-
soElement(offset, null);// StoreStore
97+
soElement(offset, null); // StoreStore
9898
return e;
9999
}
100100

src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ public boolean offer(final T e) {
9292
}
9393

9494
private boolean writeToQueue(final AtomicReferenceArray<Object> buffer, final T e, final long index, final int offset) {
95-
soElement(buffer, offset, e);// StoreStore
96-
soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms
95+
soElement(buffer, offset, e); // StoreStore
96+
soProducerIndex(index + 1); // this ensures atomic write of long on 32bit platforms
9797
return true;
9898
}
9999

@@ -103,11 +103,11 @@ private void resize(final AtomicReferenceArray<Object> oldBuffer, final long cur
103103
final AtomicReferenceArray<Object> newBuffer = new AtomicReferenceArray<Object>(capacity);
104104
producerBuffer = newBuffer;
105105
producerLookAhead = currIndex + mask - 1;
106-
soElement(newBuffer, offset, e);// StoreStore
106+
soElement(newBuffer, offset, e); // StoreStore
107107
soNext(oldBuffer, newBuffer);
108108
soElement(oldBuffer, offset, HAS_NEXT); // new buffer is visible after element is
109109
// inserted
110-
soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms
110+
soProducerIndex(currIndex + 1); // this ensures correctness on 32bit platforms
111111
}
112112

113113
private void soNext(AtomicReferenceArray<Object> curr, AtomicReferenceArray<Object> next) {
@@ -135,11 +135,11 @@ public T poll() {
135135
final long index = lpConsumerIndex();
136136
final int mask = consumerMask;
137137
final int offset = calcWrappedOffset(index, mask);
138-
final Object e = lvElement(buffer, offset);// LoadLoad
138+
final Object e = lvElement(buffer, offset); // LoadLoad
139139
boolean isNextBuffer = e == HAS_NEXT;
140140
if (null != e && !isNextBuffer) {
141-
soElement(buffer, offset, null);// StoreStore
142-
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
141+
soElement(buffer, offset, null); // StoreStore
142+
soConsumerIndex(index + 1); // this ensures correctness on 32bit platforms
143143
return (T) e;
144144
} else if (isNextBuffer) {
145145
return newBufferPoll(lvNextBufferAndUnlink(buffer, mask + 1), index, mask);
@@ -152,10 +152,10 @@ public T poll() {
152152
private T newBufferPoll(AtomicReferenceArray<Object> nextBuffer, final long index, final int mask) {
153153
consumerBuffer = nextBuffer;
154154
final int offsetInNew = calcWrappedOffset(index, mask);
155-
final T n = (T) lvElement(nextBuffer, offsetInNew);// LoadLoad
155+
final T n = (T) lvElement(nextBuffer, offsetInNew); // LoadLoad
156156
if (null != n) {
157-
soElement(nextBuffer, offsetInNew, null);// StoreStore
158-
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
157+
soElement(nextBuffer, offsetInNew, null); // StoreStore
158+
soConsumerIndex(index + 1); // this ensures correctness on 32bit platforms
159159
}
160160
return n;
161161
}
@@ -166,7 +166,7 @@ public T peek() {
166166
final long index = lpConsumerIndex();
167167
final int mask = consumerMask;
168168
final int offset = calcWrappedOffset(index, mask);
169-
final Object e = lvElement(buffer, offset);// LoadLoad
169+
final Object e = lvElement(buffer, offset); // LoadLoad
170170
if (e == HAS_NEXT) {
171171
return newBufferPeek(lvNextBufferAndUnlink(buffer, mask + 1), index, mask);
172172
}
@@ -178,7 +178,7 @@ public T peek() {
178178
private T newBufferPeek(AtomicReferenceArray<Object> nextBuffer, final long index, final int mask) {
179179
consumerBuffer = nextBuffer;
180180
final int offsetInNew = calcWrappedOffset(index, mask);
181-
return (T) lvElement(nextBuffer, offsetInNew);// LoadLoad
181+
return (T) lvElement(nextBuffer, offsetInNew); // LoadLoad
182182
}
183183

184184
@Override
@@ -277,13 +277,13 @@ public boolean offer(T first, T second) {
277277
producerBuffer = newBuffer;
278278

279279
pi = calcWrappedOffset(p, m);
280-
soElement(newBuffer, pi + 1, second);// StoreStore
280+
soElement(newBuffer, pi + 1, second); // StoreStore
281281
soElement(newBuffer, pi, first);
282282
soNext(buffer, newBuffer);
283283

284284
soElement(buffer, pi, HAS_NEXT); // new buffer is visible after element is
285285

286-
soProducerIndex(p + 2);// this ensures correctness on 32bit platforms
286+
soProducerIndex(p + 2); // this ensures correctness on 32bit platforms
287287
}
288288

289289
return true;

src/main/java/io/reactivex/internal/schedulers/IoScheduler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ public IoScheduler() {
151151
}
152152

153153
/**
154+
* Constructs an IoScheduler with the given thread factory and starts the pool of workers.
154155
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
155156
* system properties for configuring new thread creation. Cannot be null.
156157
*/

src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public SingleScheduler() {
5353
}
5454

5555
/**
56+
* Constructs a SingleScheduler with the given ThreadFactory and prepares the
57+
* single scheduler thread.
5658
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
5759
* system properties for configuring new thread creation. Cannot be null.
5860
*/

src/main/java/io/reactivex/internal/util/ExceptionHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public static String timeoutMessage(long timeout, TimeUnit unit) {
129129
+ unit.toString().toLowerCase()
130130
+ " and has been terminated.";
131131
}
132-
132+
133133
static final class Termination extends Throwable {
134134

135135
private static final long serialVersionUID = -4649703670690200604L;

src/main/java/io/reactivex/processors/UnicastProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ public static <T> UnicastProcessor<T> create(int capacityHint, Runnable onCancel
255255
* @since 2.0
256256
*/
257257
UnicastProcessor(int capacityHint) {
258-
this(capacityHint,null, true);
258+
this(capacityHint, null, true);
259259
}
260260

261261
/**

src/test/java/io/reactivex/exceptions/ExceptionsTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public void accept(Integer t1) {
5959
}
6060

6161
/**
62+
* Outdated test: Observer should not suppress errors from onCompleted.
6263
* https://github.com/ReactiveX/RxJava/issues/3885
6364
*/
6465
@Ignore("v2 components should not throw")
@@ -200,6 +201,7 @@ public void onNext(Integer t) {
200201
}
201202

202203
/**
204+
* Outdated test: throwing from onError handler.
203205
* https://github.com/ReactiveX/RxJava/issues/969
204206
*/
205207
@Ignore("v2 components should not throw")
@@ -237,6 +239,7 @@ public void onNext(Object o) {
237239
}
238240

239241
/**
242+
* Outdated test: throwing from onError.
240243
* https://github.com/ReactiveX/RxJava/issues/2998
241244
* @throws Exception on arbitrary errors
242245
*/
@@ -276,6 +279,7 @@ public void onNext(GroupedObservable<Integer, Integer> integerIntegerGroupedObse
276279
}
277280

278281
/**
282+
* Outdated test: throwing from onError.
279283
* https://github.com/ReactiveX/RxJava/issues/2998
280284
* @throws Exception on arbitrary errors
281285
*/

0 commit comments

Comments
 (0)