1515 */
1616package io .reactivesocket ;
1717
18- import java .io .IOException ;
19- import java .util .concurrent .CountDownLatch ;
20- import java .util .concurrent .atomic .AtomicInteger ;
21- import java .util .concurrent .atomic .AtomicReference ;
22- import java .util .function .BiConsumer ;
23- import java .util .function .Consumer ;
24-
25- import org .reactivestreams .Publisher ;
26- import org .reactivestreams .Subscriber ;
27- import org .reactivestreams .Subscription ;
28-
2918import io .reactivesocket .internal .Requester ;
3019import io .reactivesocket .internal .Responder ;
3120import io .reactivesocket .internal .rx .CompositeCompletable ;
3423import io .reactivesocket .rx .Disposable ;
3524import io .reactivesocket .rx .Observable ;
3625import io .reactivesocket .rx .Observer ;
37- import uk .co .real_logic .agrona .BitUtil ;
26+ import org .agrona .BitUtil ;
27+ import org .reactivestreams .Publisher ;
28+ import org .reactivestreams .Subscriber ;
29+ import org .reactivestreams .Subscription ;
30+
31+ import java .io .IOException ;
32+ import java .util .concurrent .atomic .AtomicInteger ;
33+ import java .util .function .Consumer ;
3834
3935import static io .reactivesocket .LeaseGovernor .NULL_LEASE_GOVERNOR ;
4036
4137/**
42- * Interface for a connection that supports sending requests and receiving responses
43- *
44- * Created by servers for connections Created on demand for clients
38+ * An implementation of {@link ReactiveSocket}
4539 */
46- public class ReactiveSocket implements AutoCloseable {
40+ public class ReactiveSocketImpl implements ReactiveSocket {
4741 private static final RequestHandler EMPTY_HANDLER = new RequestHandler .Builder ().build ();
4842
4943 private static final Consumer <Throwable > DEFAULT_ERROR_STREAM = t -> {
@@ -62,7 +56,7 @@ public class ReactiveSocket implements AutoCloseable {
6256 private final ConnectionSetupHandler responderConnectionHandler ;
6357 private final LeaseGovernor leaseGovernor ;
6458
65- private ReactiveSocket (
59+ private ReactiveSocketImpl (
6660 DuplexConnection connection ,
6761 boolean isServer ,
6862 ConnectionSetupPayload serverRequestorSetupPayload ,
@@ -118,7 +112,7 @@ public static ReactiveSocket fromClientConnection(
118112 }
119113 final RequestHandler h = handler != null ? handler : EMPTY_HANDLER ;
120114 Consumer <Throwable > es = errorStream != null ? errorStream : DEFAULT_ERROR_STREAM ;
121- return new ReactiveSocket (connection , false , setup , h , null , NULL_LEASE_GOVERNOR , es );
115+ return new ReactiveSocketImpl (connection , false , setup , h , null , NULL_LEASE_GOVERNOR , es );
122116 }
123117
124118 /**
@@ -178,7 +172,7 @@ public static ReactiveSocket fromServerConnection(
178172 LeaseGovernor leaseGovernor ,
179173 Consumer <Throwable > errorConsumer
180174 ) {
181- return new ReactiveSocket (connection , true , null , null , connectionHandler ,
175+ return new ReactiveSocketImpl (connection , true , null , null , connectionHandler ,
182176 leaseGovernor , errorConsumer );
183177 }
184178
@@ -192,31 +186,37 @@ public static ReactiveSocket fromServerConnection(
192186 /**
193187 * Initiate a request response exchange
194188 */
189+ @ Override
195190 public Publisher <Payload > requestResponse (final Payload payload ) {
196191 assertRequester ();
197192 return requester .requestResponse (payload );
198193 }
199194
195+ @ Override
200196 public Publisher <Void > fireAndForget (final Payload payload ) {
201197 assertRequester ();
202198 return requester .fireAndForget (payload );
203199 }
204200
201+ @ Override
205202 public Publisher <Payload > requestStream (final Payload payload ) {
206203 assertRequester ();
207204 return requester .requestStream (payload );
208205 }
209206
207+ @ Override
210208 public Publisher <Payload > requestSubscription (final Payload payload ) {
211209 assertRequester ();
212210 return requester .requestSubscription (payload );
213211 }
214212
213+ @ Override
215214 public Publisher <Payload > requestChannel (final Publisher <Payload > payloads ) {
216215 assertRequester ();
217216 return requester .requestChannel (payloads );
218217 }
219218
219+ @ Override
220220 public Publisher <Void > metadataPush (final Payload payload ) {
221221 assertRequester ();
222222 return requester .metadataPush (payload );
@@ -239,33 +239,20 @@ private void assertRequester() {
239239 }
240240 }
241241
242- /**
243- * Client check for availability to send request based on lease
244- *
245- * @return 0.0 to 1.0 indicating availability of sending requests
246- */
242+ @ Override
247243 public double availability () {
248244 // TODO: can happen in either direction
249245 assertRequester ();
250246 return requester .availability ();
251247 }
252248
253- /**
254- * Server granting new lease information to client
255- *
256- * Initial lease semantics are that server waits for periodic granting of leases by server side.
257- *
258- * @param ttl
259- * @param numberOfRequests
260- */
249+ @ Override
261250 public void sendLease (int ttl , int numberOfRequests ) {
262251 // TODO: can happen in either direction
263252 responder .sendLease (ttl , numberOfRequests );
264253 }
265254
266- /**
267- * Start protocol processing on the given DuplexConnection.
268- */
255+ @ Override
269256 public final void start (Completable c ) {
270257 if (isServer ) {
271258 responder = Responder .createServerResponder (
@@ -345,20 +332,12 @@ public void error(Throwable e) {
345332
346333 private final CompositeCompletable requesterReady = new CompositeCompletable ();
347334
348- /**
349- * Invoked when Requester is ready with success or fail.
350- *
351- * @param c
352- */
335+ @ Override
353336 public final void onRequestReady (Completable c ) {
354337 requesterReady .add (c );
355338 }
356339
357- /**
358- * Invoked when Requester is ready. Non-null exception if error. Null if success.
359- *
360- * @param c
361- */
340+ @ Override
362341 public final void onRequestReady (Consumer <Throwable > c ) {
363342 requesterReady .add (new Completable () {
364343 @ Override
@@ -458,36 +437,6 @@ public void addOutput(Frame f, Completable callback) {
458437
459438 };
460439
461- /**
462- * Start and block the current thread until startup is finished.
463- *
464- * @throws RuntimeException
465- * of InterruptedException
466- */
467- public final void startAndWait () {
468- CountDownLatch latch = new CountDownLatch (1 );
469- AtomicReference <Throwable > err = new AtomicReference <>();
470- start (new Completable () {
471- @ Override
472- public void success () {
473- latch .countDown ();
474- }
475-
476- @ Override
477- public void error (Throwable e ) {
478- latch .countDown ();
479- }
480- });
481- try {
482- latch .await ();
483- } catch (InterruptedException e ) {
484- throw new RuntimeException (e );
485- }
486- if (err .get () != null ) {
487- throw new RuntimeException (err .get ());
488- }
489- }
490-
491440 @ Override
492441 public void close () throws Exception {
493442 connection .close ();
@@ -500,6 +449,7 @@ public void close() throws Exception {
500449 }
501450 }
502451
452+ @ Override
503453 public void shutdown () {
504454 try {
505455 close ();
0 commit comments