Skip to content

Commit dfe6987

Browse files
committed
Merge pull request #85 from ReactiveSocket/stevegury/availability
Introduce Connector and availability for Connection
2 parents f62e73d + 3600cd1 commit dfe6987

File tree

9 files changed

+265
-96
lines changed

9 files changed

+265
-96
lines changed

src/main/java/io/reactivesocket/DefaultReactiveSocket.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,10 @@ public void addOutput(Frame f, Completable callback) {
440440
connection.addOutput(f, callback);
441441
}
442442

443+
@Override
444+
public double availability() {
445+
return connection.availability();
446+
}
443447
};
444448

445449
@Override

src/main/java/io/reactivesocket/DuplexConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,10 @@ default void addOutput(Frame frame, Completable callback) {
3838
s.onComplete();
3939
}, callback);
4040
}
41+
42+
/**
43+
* @return the availability of the underlying connection, a number in [0.0, 1.0]
44+
* (higher is better).
45+
*/
46+
double availability();
4147
}

src/main/java/io/reactivesocket/ReactiveSocket.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626
* Interface for a connection that supports sending requests and receiving responses
2727
*/
2828
public interface ReactiveSocket extends AutoCloseable {
29-
Publisher<Payload> requestResponse(final Payload payload);
30-
3129
Publisher<Void> fireAndForget(final Payload payload);
3230

31+
Publisher<Payload> requestResponse(final Payload payload);
32+
3333
Publisher<Payload> requestStream(final Payload payload);
3434

3535
Publisher<Payload> requestSubscription(final Payload payload);
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package io.reactivesocket;
2+
3+
import org.reactivestreams.Publisher;
4+
import org.reactivestreams.Subscriber;
5+
import org.reactivestreams.Subscription;
6+
7+
import java.util.function.Function;
8+
9+
@FunctionalInterface
10+
public interface ReactiveSocketConnector<T> {
11+
/**
12+
* Asynchronously connect and construct a ReactiveSocket
13+
* @return a Publisher that will return the ReactiveSocket
14+
*/
15+
Publisher<ReactiveSocket> connect(T address);
16+
17+
/**
18+
* Transform the ReactiveSocket returned by the connector via the provided function `func`
19+
* @param func the transformative function
20+
* @return a new ReactiveSocketConnector
21+
*/
22+
default ReactiveSocketConnector<T> chain(Function<ReactiveSocket, ReactiveSocket> func) {
23+
return new ReactiveSocketConnector<T>() {
24+
@Override
25+
public Publisher<ReactiveSocket> connect(T address) {
26+
return subscriber ->
27+
ReactiveSocketConnector.this.connect(address).subscribe(new Subscriber<ReactiveSocket>() {
28+
@Override
29+
public void onSubscribe(Subscription s) {
30+
subscriber.onSubscribe(s);
31+
}
32+
33+
@Override
34+
public void onNext(ReactiveSocket reactiveSocket) {
35+
ReactiveSocket socket = func.apply(reactiveSocket);
36+
subscriber.onNext(socket);
37+
}
38+
39+
@Override
40+
public void onError(Throwable t) {
41+
subscriber.onError(t);
42+
}
43+
44+
@Override
45+
public void onComplete() {
46+
subscriber.onComplete();
47+
}
48+
});
49+
}
50+
};
51+
}
52+
53+
/**
54+
* Create a ReactiveSocketFactory from a ReactiveSocketConnector
55+
* @param address the address to connect the connector to
56+
* @return the factory
57+
*/
58+
default ReactiveSocketFactory<T> toFactory(T address) {
59+
return new ReactiveSocketFactory<T>() {
60+
@Override
61+
public Publisher<ReactiveSocket> apply() {
62+
return ReactiveSocketConnector.this.connect(address);
63+
}
64+
65+
@Override
66+
public double availability() {
67+
return 1.0;
68+
}
69+
70+
@Override
71+
public T remote() {
72+
return address;
73+
}
74+
};
75+
}
76+
}

src/main/java/io/reactivesocket/ReactiveSocketFactory.java

Lines changed: 17 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -15,102 +15,29 @@
1515
*/
1616
package io.reactivesocket;
1717

18-
import io.reactivesocket.internal.rx.EmptySubscription;
1918
import org.reactivestreams.Publisher;
20-
import org.reactivestreams.Subscriber;
21-
import org.reactivestreams.Subscription;
22-
23-
import java.util.NoSuchElementException;
24-
import java.util.concurrent.CompletableFuture;
25-
import java.util.concurrent.ScheduledExecutorService;
26-
import java.util.concurrent.TimeUnit;
27-
import java.util.concurrent.TimeoutException;
28-
import java.util.concurrent.atomic.AtomicBoolean;
29-
30-
@FunctionalInterface
31-
public interface ReactiveSocketFactory<T, R extends ReactiveSocket> {
32-
33-
Publisher<R> call(T t);
19+
import java.net.SocketAddress;
3420

21+
/**
22+
* Factory of ReactiveSocket interface
23+
* This abstraction is useful for abstracting the creation of a ReactiveSocket
24+
* (e.g. inside the LoadBalancer which create ReactiveSocket as needed)
25+
*/
26+
public interface ReactiveSocketFactory<T> {
3527
/**
36-
* Gets a socket in a blocking manner
37-
* @param t configuration to create the reactive socket
38-
* @return blocks on create the socket
28+
* Construct the ReactiveSocket
29+
* @return
3930
*/
40-
default R callAndWait(T t) {
41-
CompletableFuture<R> future = new CompletableFuture<>();
42-
43-
call(t)
44-
.subscribe(new Subscriber<R>() {
45-
@Override
46-
public void onSubscribe(Subscription s) {
47-
s.request(1);
48-
}
49-
50-
@Override
51-
public void onNext(R reactiveSocket) {
52-
future.complete(reactiveSocket);
53-
}
54-
55-
@Override
56-
public void onError(Throwable t) {
57-
future.completeExceptionally(t);
58-
}
59-
60-
@Override
61-
public void onComplete() {
62-
future.completeExceptionally(new NoSuchElementException("Sequence contains no elements"));
63-
}
64-
});
65-
66-
return future.join();
67-
}
31+
Publisher<ReactiveSocket> apply();
6832

6933
/**
70-
*
71-
* @param t the configuration used to create the reactive socket
72-
* @param timeout timeout
73-
* @param timeUnit timeout units
74-
* @param executorService ScheduledExecutorService to schedule the timeout on
75-
* @return
34+
* @return a positive numbers representing the availability of the factory.
35+
* Higher is better, 0.0 means not available
7636
*/
77-
default Publisher<R> call(T t, long timeout, TimeUnit timeUnit, ScheduledExecutorService executorService) {
78-
Publisher<R> reactiveSocketPublisher = subscriber -> {
79-
AtomicBoolean complete = new AtomicBoolean();
80-
subscriber.onSubscribe(EmptySubscription.INSTANCE);
81-
call(t)
82-
.subscribe(new Subscriber<R>() {
83-
@Override
84-
public void onSubscribe(Subscription s) {
85-
s.request(1);
86-
}
87-
88-
@Override
89-
public void onNext(R reactiveSocket) {
90-
subscriber.onNext(reactiveSocket);
91-
}
92-
93-
@Override
94-
public void onError(Throwable t) {
95-
subscriber.onError(t);
96-
}
97-
98-
@Override
99-
public void onComplete() {
100-
if (complete.compareAndSet(false, true)) {
101-
subscriber.onComplete();
102-
}
103-
}
104-
});
105-
106-
executorService.schedule(() -> {
107-
if (complete.compareAndSet(false, true)) {
108-
subscriber.onError(new TimeoutException());
109-
}
110-
}, timeout, timeUnit);
111-
};
112-
113-
return reactiveSocketPublisher;
114-
}
37+
double availability();
11538

39+
/**
40+
* @return an identifier of the remote location
41+
*/
42+
T remote();
11643
}

src/main/java/io/reactivesocket/internal/Requester.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,14 +286,14 @@ private void assertStarted() {
286286
*/
287287
public double availability() {
288288
if (!honorLease) {
289-
return 1.0;
289+
return connection.availability();
290290
}
291291
final long now = System.currentTimeMillis();
292292
double available = 0.0;
293293
if (numberOfRemainingRequests > 0 && (now < ttlExpiration)) {
294294
available = 1.0;
295295
}
296-
return available;
296+
return available * connection.availability();
297297
}
298298

299299
/*
@@ -873,6 +873,7 @@ public void error(Throwable e) {
873873

874874
Publisher<Frame> keepaliveTicker =
875875
PublisherUtils.keepaliveTicker(KEEPALIVE_INTERVAL_MS, TimeUnit.MILLISECONDS);
876+
876877
connection.addOutput(keepaliveTicker,
877878
new Completable() {
878879
public void success() {}

0 commit comments

Comments
 (0)