Skip to content

Commit aa05464

Browse files
committed
Introduce Connector and availability for Connection
Problem The DuplexConnection class doesn't expose any way to probe its state, this is somewhat problematic and doesn't fit well in the current model where the availability is the composable way of chaining state. The ReactiveSocketFactory create a ReactiveSocket from an `address` (currently SocketAddress). There are places in the code where the concept of address is not needed and it leads to implicit dependency on it. Solution Add a `double availability()` method to the DuplexConnection interface, most implementation of this method will be straightforward (just returning 0.0 if the underlying resource is unavailable, 1.0 otherwise). Split the concept of ReactiveSocketFactory in two, the Factory and the Connector. The Connector is responsible for creating the ReactiveSocket from the address, and the factory can create a ReactiveSocket without any argument.
1 parent 61a45c0 commit aa05464

File tree

7 files changed

+42
-96
lines changed

7 files changed

+42
-96
lines changed

src/main/java/io/reactivesocket/DefaultReactiveSocket.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,10 @@ public void addOutput(Frame f, Completable callback) {
440440
connection.addOutput(f, callback);
441441
}
442442

443+
@Override
444+
public double availability() {
445+
return connection.availability();
446+
}
443447
};
444448

445449
@Override

src/main/java/io/reactivesocket/DuplexConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,10 @@ default void addOutput(Frame frame, Completable callback) {
3838
s.onComplete();
3939
}, callback);
4040
}
41+
42+
/**
43+
* @return the availability of the underlying connection, a number in [0.0, 1.0]
44+
* (higher is better).
45+
*/
46+
double availability();
4147
}

src/main/java/io/reactivesocket/ReactiveSocket.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626
* Interface for a connection that supports sending requests and receiving responses
2727
*/
2828
public interface ReactiveSocket extends AutoCloseable {
29-
Publisher<Payload> requestResponse(final Payload payload);
30-
3129
Publisher<Void> fireAndForget(final Payload payload);
3230

31+
Publisher<Payload> requestResponse(final Payload payload);
32+
3333
Publisher<Payload> requestStream(final Payload payload);
3434

3535
Publisher<Payload> requestSubscription(final Payload payload);

src/main/java/io/reactivesocket/ReactiveSocketFactory.java

Lines changed: 17 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -15,102 +15,29 @@
1515
*/
1616
package io.reactivesocket;
1717

18-
import io.reactivesocket.internal.rx.EmptySubscription;
1918
import org.reactivestreams.Publisher;
20-
import org.reactivestreams.Subscriber;
21-
import org.reactivestreams.Subscription;
22-
23-
import java.util.NoSuchElementException;
24-
import java.util.concurrent.CompletableFuture;
25-
import java.util.concurrent.ScheduledExecutorService;
26-
import java.util.concurrent.TimeUnit;
27-
import java.util.concurrent.TimeoutException;
28-
import java.util.concurrent.atomic.AtomicBoolean;
29-
30-
@FunctionalInterface
31-
public interface ReactiveSocketFactory<T, R extends ReactiveSocket> {
32-
33-
Publisher<R> call(T t);
19+
import java.net.SocketAddress;
3420

21+
/**
22+
* Factory of ReactiveSocket interface
23+
* This abstraction is useful for abstracting the creation of a ReactiveSocket
24+
* (e.g. inside the LoadBalancer which create ReactiveSocket as needed)
25+
*/
26+
public interface ReactiveSocketFactory<T> {
3527
/**
36-
* Gets a socket in a blocking manner
37-
* @param t configuration to create the reactive socket
38-
* @return blocks on create the socket
28+
* Construct the ReactiveSocket
29+
* @return
3930
*/
40-
default R callAndWait(T t) {
41-
CompletableFuture<R> future = new CompletableFuture<>();
42-
43-
call(t)
44-
.subscribe(new Subscriber<R>() {
45-
@Override
46-
public void onSubscribe(Subscription s) {
47-
s.request(1);
48-
}
49-
50-
@Override
51-
public void onNext(R reactiveSocket) {
52-
future.complete(reactiveSocket);
53-
}
54-
55-
@Override
56-
public void onError(Throwable t) {
57-
future.completeExceptionally(t);
58-
}
59-
60-
@Override
61-
public void onComplete() {
62-
future.completeExceptionally(new NoSuchElementException("Sequence contains no elements"));
63-
}
64-
});
65-
66-
return future.join();
67-
}
31+
Publisher<ReactiveSocket> apply();
6832

6933
/**
70-
*
71-
* @param t the configuration used to create the reactive socket
72-
* @param timeout timeout
73-
* @param timeUnit timeout units
74-
* @param executorService ScheduledExecutorService to schedule the timeout on
75-
* @return
34+
* @return a positive numbers representing the availability of the factory.
35+
* Higher is better, 0.0 means not available
7636
*/
77-
default Publisher<R> call(T t, long timeout, TimeUnit timeUnit, ScheduledExecutorService executorService) {
78-
Publisher<R> reactiveSocketPublisher = subscriber -> {
79-
AtomicBoolean complete = new AtomicBoolean();
80-
subscriber.onSubscribe(EmptySubscription.INSTANCE);
81-
call(t)
82-
.subscribe(new Subscriber<R>() {
83-
@Override
84-
public void onSubscribe(Subscription s) {
85-
s.request(1);
86-
}
87-
88-
@Override
89-
public void onNext(R reactiveSocket) {
90-
subscriber.onNext(reactiveSocket);
91-
}
92-
93-
@Override
94-
public void onError(Throwable t) {
95-
subscriber.onError(t);
96-
}
97-
98-
@Override
99-
public void onComplete() {
100-
if (complete.compareAndSet(false, true)) {
101-
subscriber.onComplete();
102-
}
103-
}
104-
});
105-
106-
executorService.schedule(() -> {
107-
if (complete.compareAndSet(false, true)) {
108-
subscriber.onError(new TimeoutException());
109-
}
110-
}, timeout, timeUnit);
111-
};
112-
113-
return reactiveSocketPublisher;
114-
}
37+
double availability();
11538

39+
/**
40+
* @return an identifier of the remote location
41+
*/
42+
T remote();
11643
}

src/main/java/io/reactivesocket/internal/Requester.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,14 +286,14 @@ private void assertStarted() {
286286
*/
287287
public double availability() {
288288
if (!honorLease) {
289-
return 1.0;
289+
return connection.availability();
290290
}
291291
final long now = System.currentTimeMillis();
292292
double available = 0.0;
293293
if (numberOfRemainingRequests > 0 && (now < ttlExpiration)) {
294294
available = 1.0;
295295
}
296-
return available;
296+
return available * connection.availability();
297297
}
298298

299299
/*
@@ -873,6 +873,7 @@ public void error(Throwable e) {
873873

874874
Publisher<Frame> keepaliveTicker =
875875
PublisherUtils.keepaliveTicker(KEEPALIVE_INTERVAL_MS, TimeUnit.MILLISECONDS);
876+
876877
connection.addOutput(keepaliveTicker,
877878
new Completable() {
878879
public void success() {}

src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ public void addOutput(Frame f, Completable callback) {
6464
callback.success();
6565
}
6666

67+
@Override
68+
public double availability() {
69+
return 1.0;
70+
}
71+
6772
@Override
6873
public Observable<Frame> getInput() {
6974
return toInput;
@@ -72,11 +77,9 @@ public Observable<Frame> getInput() {
7277
public void connectToServerConnection(PerfTestConnection serverConnection) {
7378
writeSubject.subscribe(serverConnection.toInput);
7479
serverConnection.writeSubject.subscribe(toInput);
75-
7680
}
7781

7882
@Override
7983
public void close() throws IOException {
80-
8184
}
8285
}

src/test/java/io/reactivesocket/TestConnection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ public void addOutput(Frame f, Completable callback) {
4848
callback.success();
4949
}
5050

51+
@Override
52+
public double availability() {
53+
return 1.0;
54+
}
55+
5156
@Override
5257
public io.reactivesocket.rx.Observable<Frame> getInput() {
5358
return new io.reactivesocket.rx.Observable<Frame>() {

0 commit comments

Comments
 (0)