Skip to content

Commit 28cc17e

Browse files
committed
Add the missing Connector class + the helper ReactiveSocketProxy
1 parent aa05464 commit 28cc17e

File tree

2 files changed

+226
-0
lines changed

2 files changed

+226
-0
lines changed
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package io.reactivesocket;
2+
3+
import org.reactivestreams.Publisher;
4+
import org.reactivestreams.Subscriber;
5+
import org.reactivestreams.Subscription;
6+
7+
import java.util.function.Function;
8+
9+
@FunctionalInterface
10+
public interface ReactiveSocketConnector<T> {
11+
/**
12+
* Asynchronously connect and construct a ReactiveSocket
13+
* @return a Publisher that will return the ReactiveSocket
14+
*/
15+
Publisher<ReactiveSocket> connect(T address);
16+
17+
/**
18+
* Transform the ReactiveSocket returned by the connector via the provided function `func`
19+
* @param func the transformative function
20+
* @return a new ReactiveSocketConnector
21+
*/
22+
default ReactiveSocketConnector<T> chain(Function<ReactiveSocket, ReactiveSocket> func) {
23+
return new ReactiveSocketConnector<T>() {
24+
@Override
25+
public Publisher<ReactiveSocket> connect(T address) {
26+
return subscriber ->
27+
ReactiveSocketConnector.this.connect(address).subscribe(new Subscriber<ReactiveSocket>() {
28+
@Override
29+
public void onSubscribe(Subscription s) {
30+
subscriber.onSubscribe(s);
31+
}
32+
33+
@Override
34+
public void onNext(ReactiveSocket reactiveSocket) {
35+
ReactiveSocket socket = func.apply(reactiveSocket);
36+
subscriber.onNext(socket);
37+
}
38+
39+
@Override
40+
public void onError(Throwable t) {
41+
subscriber.onError(t);
42+
}
43+
44+
@Override
45+
public void onComplete() {
46+
subscriber.onComplete();
47+
}
48+
});
49+
}
50+
};
51+
}
52+
53+
/**
54+
* Create a ReactiveSocketFactory from a ReactiveSocketConnector
55+
* @param address the address to connect the connector to
56+
* @return the factory
57+
*/
58+
default ReactiveSocketFactory<T> toFactory(T address) {
59+
return new ReactiveSocketFactory<T>() {
60+
@Override
61+
public Publisher<ReactiveSocket> apply() {
62+
return ReactiveSocketConnector.this.connect(address);
63+
}
64+
65+
@Override
66+
public double availability() {
67+
return 1.0;
68+
}
69+
70+
@Override
71+
public T remote() {
72+
return address;
73+
}
74+
};
75+
}
76+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket.util;
17+
18+
import io.reactivesocket.Payload;
19+
import io.reactivesocket.ReactiveSocket;
20+
import io.reactivesocket.rx.Completable;
21+
import org.reactivestreams.Publisher;
22+
import org.reactivestreams.Subscriber;
23+
24+
import java.util.function.Consumer;
25+
import java.util.function.Function;
26+
27+
28+
/**
29+
* Wrapper/Proxy for a ReactiveSocket.
30+
* This is useful when we want to override a specific method.
31+
*/
32+
public class ReactiveSocketProxy implements ReactiveSocket {
33+
protected final ReactiveSocket child;
34+
private final Function<Subscriber<? super Payload>, Subscriber<? super Payload>> subscriberWrapper;
35+
36+
public ReactiveSocketProxy(ReactiveSocket child, Function<Subscriber<? super Payload>, Subscriber<? super Payload>> subscriberWrapper) {
37+
this.child = child;
38+
this.subscriberWrapper = subscriberWrapper;
39+
}
40+
41+
public ReactiveSocketProxy(ReactiveSocket child) {
42+
this(child, null);
43+
}
44+
45+
@Override
46+
public Publisher<Void> fireAndForget(Payload payload) {
47+
return child.fireAndForget(payload);
48+
}
49+
50+
@Override
51+
public Publisher<Payload> requestResponse(Payload payload) {
52+
if (subscriberWrapper == null) {
53+
return child.requestResponse(payload);
54+
} else {
55+
return s -> {
56+
Subscriber<? super Payload> subscriber = subscriberWrapper.apply(s);
57+
child.requestResponse(payload).subscribe(subscriber);
58+
};
59+
}
60+
}
61+
62+
@Override
63+
public Publisher<Payload> requestStream(Payload payload) {
64+
if (subscriberWrapper == null) {
65+
return child.requestStream(payload);
66+
} else {
67+
return s -> {
68+
Subscriber<? super Payload> subscriber = subscriberWrapper.apply(s);
69+
child.requestStream(payload).subscribe(subscriber);
70+
};
71+
}
72+
73+
}
74+
75+
@Override
76+
public Publisher<Payload> requestSubscription(Payload payload) {
77+
if (subscriberWrapper == null) {
78+
return child.requestSubscription(payload);
79+
} else {
80+
return s -> {
81+
Subscriber<? super Payload> subscriber = subscriberWrapper.apply(s);
82+
child.requestSubscription(payload).subscribe(subscriber);
83+
};
84+
}
85+
86+
}
87+
88+
@Override
89+
public Publisher<Payload> requestChannel(Publisher<Payload> payloads) {
90+
if (subscriberWrapper == null) {
91+
return child.requestChannel(payloads);
92+
} else {
93+
return s -> {
94+
Subscriber<? super Payload> subscriber = subscriberWrapper.apply(s);
95+
child.requestChannel(payloads).subscribe(subscriber);
96+
};
97+
}
98+
99+
}
100+
101+
@Override
102+
public Publisher<Void> metadataPush(Payload payload) {
103+
return child.metadataPush(payload);
104+
}
105+
106+
@Override
107+
public double availability() {
108+
return child.availability();
109+
}
110+
111+
@Override
112+
public void start(Completable c) {
113+
child.start(c);
114+
}
115+
116+
@Override
117+
public void onRequestReady(Consumer<Throwable> c) {
118+
child.onRequestReady(c);
119+
}
120+
121+
@Override
122+
public void onRequestReady(Completable c) {
123+
child.onRequestReady(c);
124+
}
125+
126+
@Override
127+
public void onShutdown(Completable c) {
128+
child.onShutdown(c);
129+
}
130+
131+
@Override
132+
public void sendLease(int ttl, int numberOfRequests) {
133+
child.sendLease(ttl, numberOfRequests);
134+
}
135+
136+
@Override
137+
public void shutdown() {
138+
child.shutdown();
139+
}
140+
141+
@Override
142+
public void close() throws Exception {
143+
child.close();
144+
}
145+
146+
@Override
147+
public String toString() {
148+
return "ReactiveSocketProxy(" + child.toString() + ")";
149+
}
150+
}

0 commit comments

Comments
 (0)