Skip to content

Commit 9e2471e

Browse files
committed
updated ReactiveSocketFactory
1 parent ac77485 commit 9e2471e

File tree

2 files changed

+110
-3
lines changed

2 files changed

+110
-3
lines changed

src/main/java/io/reactivesocket/ReactiveSocketFactory.java

Lines changed: 102 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,111 @@
1515
*/
1616
package io.reactivesocket;
1717

18+
import io.reactivesocket.internal.rx.EmptySubscription;
1819
import org.reactivestreams.Publisher;
20+
import org.reactivestreams.Subscriber;
21+
import org.reactivestreams.Subscription;
1922

20-
import java.net.SocketAddress;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.ScheduledExecutorService;
2125
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.TimeoutException;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.concurrent.atomic.AtomicReference;
2229

2330
@FunctionalInterface
24-
public interface ReactiveSocketFactory {
25-
Publisher<ReactiveSocket> call(SocketAddress address, long timeout, TimeUnit timeUnit);
31+
public interface ReactiveSocketFactory<T, R extends ReactiveSocket> {
32+
33+
Publisher<R> call(T t);
34+
35+
/**
36+
* Gets a socket in a blocking manner
37+
* @param t configuration to create the reactive socket
38+
* @return blocks on create the socket
39+
*/
40+
default R callAndWait(T t) {
41+
AtomicReference<R> reference = new AtomicReference<>();
42+
AtomicReference<Throwable> error = new AtomicReference<>();
43+
CountDownLatch latch = new CountDownLatch(1);
44+
45+
call(t)
46+
.subscribe(new Subscriber<R>() {
47+
@Override
48+
public void onSubscribe(Subscription s) {
49+
s.request(1);
50+
}
51+
52+
@Override
53+
public void onNext(R reactiveSocket) {
54+
reference.set(reactiveSocket);
55+
}
56+
57+
@Override
58+
public void onError(Throwable t) {
59+
error.set(t);
60+
latch.countDown();
61+
}
62+
63+
@Override
64+
public void onComplete() {
65+
latch.countDown();
66+
}
67+
});
68+
69+
if (error.get() != null) {
70+
throw new RuntimeException(error.get());
71+
} else {
72+
return reference.get();
73+
}
74+
}
75+
76+
/**
77+
*
78+
* @param t the configuration used to create the reactive socket
79+
* @param timeout timeout
80+
* @param timeUnit timeout units
81+
* @param executorService ScheduledExecutorService to schedule the timeout on
82+
* @return
83+
*/
84+
default Publisher<R> call(T t, long timeout, TimeUnit timeUnit, ScheduledExecutorService executorService) {
85+
Publisher<R> reactiveSocketPublisher = subscriber -> {
86+
AtomicBoolean complete = new AtomicBoolean();
87+
subscriber.onSubscribe(EmptySubscription.INSTANCE);
88+
call(t)
89+
.subscribe(new Subscriber<R>() {
90+
@Override
91+
public void onSubscribe(Subscription s) {
92+
s.request(1);
93+
}
94+
95+
@Override
96+
public void onNext(R reactiveSocket) {
97+
subscriber.onNext(reactiveSocket);
98+
}
99+
100+
@Override
101+
public void onError(Throwable t) {
102+
subscriber.onError(t);
103+
}
104+
105+
@Override
106+
public void onComplete() {
107+
if (!complete.get()) {
108+
complete.set(true);
109+
subscriber.onComplete();
110+
}
111+
}
112+
});
113+
114+
executorService.schedule(() -> {
115+
if (!complete.get()) {
116+
complete.set(true);
117+
subscriber.onError(new TimeoutException());
118+
}
119+
}, timeout, timeUnit);
120+
};
121+
122+
return reactiveSocketPublisher;
123+
}
124+
26125
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.reactivesocket;
2+
3+
import java.net.SocketAddress;
4+
5+
@FunctionalInterface
6+
public interface ReactiveSocketSocketAddressFactory<R extends ReactiveSocket> extends ReactiveSocketFactory<SocketAddress, R> {
7+
8+
}

0 commit comments

Comments
 (0)