Skip to content

Commit 0d04c0c

Browse files
committed
Remove RxJava Scheduler.
1 parent 36c51b6 commit 0d04c0c

File tree

2 files changed

+11
-273
lines changed

2 files changed

+11
-273
lines changed

durian-swt/src/main/java/com/diffplug/common/swt/SwtExec.java

Lines changed: 4 additions & 264 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2020-2022 DiffPlug
2+
* Copyright (C) 2020-2025 DiffPlug
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,30 +27,19 @@
2727
import com.diffplug.common.util.concurrent.MoreExecutors;
2828
import com.diffplug.common.util.concurrent.Runnables;
2929
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
30-
import io.reactivex.Scheduler;
31-
import io.reactivex.disposables.CompositeDisposable;
32-
import io.reactivex.disposables.Disposable;
33-
import io.reactivex.disposables.Disposables;
34-
import io.reactivex.schedulers.Schedulers;
35-
import java.util.HashSet;
3630
import java.util.List;
3731
import java.util.Objects;
38-
import java.util.Set;
3932
import java.util.concurrent.AbstractExecutorService;
4033
import java.util.concurrent.Callable;
4134
import java.util.concurrent.Delayed;
4235
import java.util.concurrent.ExecutionException;
4336
import java.util.concurrent.Executor;
44-
import java.util.concurrent.Future;
45-
import java.util.concurrent.FutureTask;
4637
import java.util.concurrent.RejectedExecutionException;
4738
import java.util.concurrent.RunnableFuture;
4839
import java.util.concurrent.ScheduledExecutorService;
4940
import java.util.concurrent.ScheduledFuture;
5041
import java.util.concurrent.TimeUnit;
5142
import java.util.concurrent.TimeoutException;
52-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
53-
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
5443
import java.util.function.Function;
5544
import java.util.function.Supplier;
5645
import kotlin.coroutines.CoroutineContext;
@@ -332,7 +321,7 @@ public RxExecutor getRxExecutor() {
332321
}
333322

334323
SwtExec() {
335-
this(exec -> Rx.callbackOn(exec, new SwtScheduler(exec), new SwtDispatcher(exec)));
324+
this(exec -> Rx.callbackOn(exec, new SwtDispatcher(exec)));
336325
}
337326

338327
SwtExec(Function<SwtExec, RxExecutor> rxExecutorCreator) {
@@ -722,206 +711,6 @@ public MainCoroutineDispatcher getImmediate() {
722711
}
723712
}
724713

725-
/** Scheduler that runs tasks on Swt's event dispatch thread. */
726-
static final class SwtScheduler extends Scheduler {
727-
final SwtExec exec;
728-
729-
public SwtScheduler(SwtExec exec) {
730-
this.exec = exec;
731-
}
732-
733-
@Override
734-
public Worker createWorker() {
735-
return new SwtWorker(exec);
736-
}
737-
738-
static final class SwtWorker extends Scheduler.Worker {
739-
final SwtExec exec;
740-
741-
volatile boolean unsubscribed;
742-
743-
/** Set of active tasks, guarded by this. */
744-
Set<SwtScheduledAction> tasks;
745-
746-
public SwtWorker(SwtExec exec) {
747-
this.exec = exec;
748-
this.tasks = new HashSet<>();
749-
}
750-
751-
@Override
752-
public void dispose() {
753-
if (unsubscribed) {
754-
return;
755-
}
756-
unsubscribed = true;
757-
758-
Set<SwtScheduledAction> set;
759-
synchronized (this) {
760-
set = tasks;
761-
tasks = null;
762-
}
763-
764-
if (set != null) {
765-
for (SwtScheduledAction a : set) {
766-
a.cancelFuture();
767-
}
768-
}
769-
}
770-
771-
void remove(SwtScheduledAction a) {
772-
if (unsubscribed) {
773-
return;
774-
}
775-
synchronized (this) {
776-
if (unsubscribed) {
777-
return;
778-
}
779-
780-
tasks.remove(a);
781-
}
782-
}
783-
784-
@Override
785-
public boolean isDisposed() {
786-
return unsubscribed;
787-
}
788-
789-
@Override
790-
public Disposable schedule(Runnable action) {
791-
if (unsubscribed) {
792-
return Disposables.disposed();
793-
}
794-
795-
SwtScheduledAction a = new SwtScheduledAction(action, this);
796-
797-
synchronized (this) {
798-
if (unsubscribed) {
799-
return Disposables.disposed();
800-
}
801-
802-
tasks.add(a);
803-
}
804-
805-
exec.execute(a);
806-
807-
if (unsubscribed) {
808-
a.cancel();
809-
return Disposables.disposed();
810-
}
811-
812-
return a;
813-
}
814-
815-
@Override
816-
public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
817-
if (unsubscribed) {
818-
return Disposables.disposed();
819-
}
820-
821-
SwtScheduledAction a = new SwtScheduledAction(action, this);
822-
823-
synchronized (this) {
824-
if (unsubscribed) {
825-
return Disposables.disposed();
826-
}
827-
828-
tasks.add(a);
829-
}
830-
831-
Future<?> f = exec.schedule(a, delayTime, unit);
832-
833-
if (unsubscribed) {
834-
a.cancel();
835-
f.cancel(true);
836-
return Disposables.disposed();
837-
}
838-
839-
a.setFuture(f);
840-
841-
return a;
842-
}
843-
844-
/**
845-
* Represents a cancellable asynchronous Runnable that wraps an action
846-
* and manages the associated Worker lifecycle.
847-
*/
848-
static final class SwtScheduledAction implements Runnable, Disposable {
849-
final Runnable action;
850-
851-
final SwtWorker parent;
852-
853-
volatile Future<?> future;
854-
@SuppressWarnings("rawtypes")
855-
static final AtomicReferenceFieldUpdater<SwtScheduledAction, Future> FUTURE = AtomicReferenceFieldUpdater.newUpdater(SwtScheduledAction.class, Future.class, "future");
856-
857-
static final Future<?> CANCELLED = new FutureTask<>(() -> {}, null);
858-
859-
static final Future<?> FINISHED = new FutureTask<>(() -> {}, null);
860-
861-
volatile int state;
862-
static final AtomicIntegerFieldUpdater<SwtScheduledAction> STATE = AtomicIntegerFieldUpdater.newUpdater(SwtScheduledAction.class, "state");
863-
864-
static final int STATE_ACTIVE = 0;
865-
static final int STATE_FINISHED = 1;
866-
static final int STATE_CANCELLED = 2;
867-
868-
public SwtScheduledAction(Runnable action, SwtWorker parent) {
869-
this.action = action;
870-
this.parent = parent;
871-
}
872-
873-
@Override
874-
public void run() {
875-
if (!parent.unsubscribed && state == STATE_ACTIVE) {
876-
try {
877-
action.run();
878-
} finally {
879-
FUTURE.lazySet(this, FINISHED);
880-
if (STATE.compareAndSet(this, STATE_ACTIVE, STATE_FINISHED)) {
881-
parent.remove(this);
882-
}
883-
}
884-
}
885-
}
886-
887-
@Override
888-
public boolean isDisposed() {
889-
return state != STATE_ACTIVE;
890-
}
891-
892-
@Override
893-
public void dispose() {
894-
if (STATE.compareAndSet(this, STATE_ACTIVE, STATE_CANCELLED)) {
895-
parent.remove(this);
896-
}
897-
cancelFuture();
898-
}
899-
900-
void setFuture(Future<?> f) {
901-
if (FUTURE.compareAndSet(this, null, f)) {
902-
if (future != FINISHED) {
903-
f.cancel(true);
904-
}
905-
}
906-
}
907-
908-
void cancelFuture() {
909-
Future<?> f = future;
910-
if (f != CANCELLED && f != FINISHED) {
911-
f = FUTURE.getAndSet(this, CANCELLED);
912-
if (f != null && f != CANCELLED && f != FINISHED) {
913-
f.cancel(true);
914-
}
915-
}
916-
}
917-
918-
void cancel() {
919-
state = STATE_CANCELLED;
920-
}
921-
}
922-
}
923-
}
924-
925714
/** Global executor for actions which should only execute immediately on the SWT thread. */
926715
private static SwtExec swtOnly;
927716

@@ -937,7 +726,7 @@ void cancel() {
937726
@SuppressFBWarnings(value = "LI_LAZY_INIT_STATIC", justification = "This race condition is fine, see comment in SwtExec.blocking()")
938727
public static SwtExec swtOnly() {
939728
if (swtOnly == null) {
940-
swtOnly = new SwtExec(exec -> Rx.callbackOn(exec, new SwtOnlyScheduler(), new SwtOnlyDispatcher())) {
729+
swtOnly = new SwtExec(exec -> Rx.callbackOn(exec, new SwtOnlyDispatcher())) {
941730
@Override
942731
public void execute(Runnable runnable) {
943732
requireNonNull(runnable);
@@ -968,63 +757,14 @@ public void dispatch(@NotNull CoroutineContext coroutineContext, @NotNull Runnab
968757
}
969758
}
970759

971-
/**
972-
* Copied straight from rx.schedulers.ImmediateScheduler,
973-
* but checks for the SWT thread before running stuff,
974-
* and handles future-scheduling correctly.
975-
*/
976-
static final class SwtOnlyScheduler extends Scheduler {
977-
@Override
978-
public Worker createWorker() {
979-
return new InnerImmediateScheduler();
980-
}
981-
982-
private static final class InnerImmediateScheduler extends Scheduler.Worker {
983-
final Disposable innerSubscription = Disposables.empty();
984-
985-
@Override
986-
public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
987-
CompositeDisposable sub = new CompositeDisposable();
988-
Future<?> future = SwtExec.async().schedule(() -> {
989-
if (!sub.isDisposed()) {
990-
action.run();
991-
sub.dispose();
992-
}
993-
}, delayTime, unit);
994-
sub.add(Disposables.fromFuture(future));
995-
return sub;
996-
}
997-
998-
@Override
999-
public Disposable schedule(Runnable action) {
1000-
if (Thread.currentThread() == swtThread) {
1001-
action.run();
1002-
} else {
1003-
SWT.error(SWT.ERROR_THREAD_INVALID_ACCESS);
1004-
}
1005-
return Disposables.disposed();
1006-
}
1007-
1008-
@Override
1009-
public void dispose() {
1010-
innerSubscription.dispose();
1011-
}
1012-
1013-
@Override
1014-
public boolean isDisposed() {
1015-
return innerSubscription.isDisposed();
1016-
}
1017-
}
1018-
}
1019-
1020760
private static class SameThreadCoroutineDispatcher extends CoroutineDispatcher {
1021761
@Override
1022762
public void dispatch(@NotNull CoroutineContext coroutineContext, @NotNull Runnable runnable) {
1023763
runnable.run();
1024764
}
1025765
}
1026766

1027-
private static final SwtExec sameThread = new SwtExec(exec -> Rx.callbackOn(MoreExecutors.directExecutor(), Schedulers.trampoline(), new SameThreadCoroutineDispatcher())) {
767+
private static final SwtExec sameThread = new SwtExec(exec -> Rx.callbackOn(MoreExecutors.directExecutor(), new SameThreadCoroutineDispatcher())) {
1028768
@Override
1029769
public void execute(Runnable runnable) {
1030770
requireNonNull(runnable);

durian-swt/src/test/java/com/diffplug/common/swt/SwtExecProfile.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 DiffPlug
2+
* Copyright (C) 2020-2025 DiffPlug
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,11 +21,11 @@
2121
import com.diffplug.common.debug.LapTimer;
2222
import com.diffplug.common.rx.Rx;
2323
import io.reactivex.disposables.Disposable;
24-
import io.reactivex.subjects.PublishSubject;
2524
import java.util.LinkedHashMap;
2625
import java.util.Map;
2726
import java.util.concurrent.RejectedExecutionException;
2827
import java.util.function.Consumer;
28+
import kotlinx.coroutines.flow.MutableSharedFlow;
2929
import org.eclipse.swt.widgets.Display;
3030
import org.eclipse.swt.widgets.Widget;
3131
import org.junit.Test;
@@ -131,18 +131,17 @@ public void addSwtExec(String name, SwtExec underTest) {
131131
public void run(Widget guard) {
132132
JuxtaProfiler profiler = new JuxtaProfiler();
133133
profiler.addTestNanoWrap2Sec("control", () -> {
134-
PublishSubject<Integer> subject = PublishSubject.create();
135-
subject.subscribe(val -> {});
134+
MutableSharedFlow<Integer> subject = Rx.INSTANCE.createEmitFlow();
136135
drain(subject);
137136
});
138137
toProfile.forEach((name, underTest) -> {
139138
profiler.addTest(name, new JuxtaProfiler.InitTimedCleanup(LapTimer.createNanoWrap2Sec()) {
140-
PublishSubject<Integer> subject;
139+
MutableSharedFlow<Integer> subject;
141140
Disposable sub;
142141

143142
@Override
144143
protected void init() throws Throwable {
145-
subject = PublishSubject.create();
144+
subject = Rx.INSTANCE.createEmitFlow();
146145
sub = underTest.guardOn(guard).subscribeDisposable(subject, val -> {});
147146
}
148147

@@ -162,11 +161,10 @@ protected void cleanup() throws Throwable {
162161
profiler.runRandomTrials(NUM_TRIALS);
163162
}
164163

165-
private static void drain(PublishSubject<Integer> subject) {
164+
private static void drain(MutableSharedFlow<Integer> subject) {
166165
for (int i = 0; i < EVENTS_TO_PUSH; ++i) {
167-
subject.onNext(0);
166+
Rx.emit(subject, 0);
168167
}
169-
subject.onComplete();
170168
}
171169
}
172170

0 commit comments

Comments
 (0)