1919import com .appunite .websocket .rx .object .ObjectSerializer ;
2020import com .appunite .websocket .rx .object .ObjectWebSocketSender ;
2121import com .appunite .websocket .rx .object .RxObjectWebSockets ;
22- import com .appunite .websocket .rx .object .messages .RxObjectEventConn ;
23- import com .appunite .websocket .rx .messages .RxEventConn ;
24-
25- import okhttp3 .RequestBody ;
26- import okhttp3 .ws .WebSocket ;
2722
23+ import java .util .concurrent .Callable ;
2824import java .util .logging .Level ;
2925import java .util .logging .Logger ;
3026
3127import javax .annotation .Nonnull ;
3228
33- import rx .Observable ;
34- import rx .Observer ;
35- import rx .Subscriber ;
36- import rx .functions .Func1 ;
29+ import okhttp3 .WebSocket ;
30+ import rx .Single ;
3731
3832public class RxMoreObservables {
3933
@@ -42,91 +36,47 @@ public class RxMoreObservables {
4236 public RxMoreObservables () {
4337 }
4438
45- @ Nonnull
46- private static Observable <Object > sendMessage (final @ Nonnull WebSocket sender , final @ Nonnull String message ) {
47- return Observable .create (new Observable .OnSubscribe <Object >() {
48- @ Override
49- public void call (Subscriber <? super Object > subscriber ) {
50- try {
51- logger .log (Level .FINE , "sendStringMessage: {0}" , message );
52- sender .sendMessage (RequestBody .create (WebSocket .TEXT , message ));
53- subscriber .onNext (new Object ());
54- subscriber .onCompleted ();
55- } catch (Exception e ) {
56- subscriber .onError (e );
57- }
58- }
59- });
60- }
61-
6239 /**
63- * Transformer that convert String message to observable that returns if message was sent
64- *
65- * @param connection connection event that is used to send message
66- * @return Observable that returns {@link Observer#onNext(Object)} with new Object()
67- * and {@link Observer#onCompleted()} or {@link Observer#onError(Throwable)}
40+ * Enqueue message to send
6841 *
69- * @see #sendMessage(ObjectWebSocketSender, Object)
42+ * @param sender connection event that is used to send message
43+ * @param message message to send
44+ * @return Single that returns true if message was enqueued
45+ * @see #sendObjectMessage(ObjectWebSocketSender, Object)
7046 */
71- @ SuppressWarnings ("unused" )
7247 @ Nonnull
73- public static Observable . Transformer < String , Object > sendMessage (@ Nonnull final RxEventConn connection ) {
74- return new Observable . Transformer < String , Object >() {
48+ public static Single < Boolean > sendMessage (final @ Nonnull WebSocket sender , final @ Nonnull String message ) {
49+ return Single . fromCallable ( new Callable < Boolean >() {
7550 @ Override
76- public Observable <Object > call (Observable <String > stringObservable ) {
77- return stringObservable .flatMap (new Func1 <String , Observable <?>>() {
78- @ Override
79- public Observable <?> call (String message ) {
80- return sendMessage (connection .sender (), message );
81- }
82- });
83- }
84- };
85- }
86-
87- @ Nonnull
88- private static Observable <Object > sendMessage (final @ Nonnull ObjectWebSocketSender sender , final @ Nonnull Object message ) {
89- return Observable .create (new Observable .OnSubscribe <Object >() {
90- @ Override
91- public void call (Subscriber <? super Object > subscriber ) {
92- try {
93- logger .log (Level .FINE , "sendStringMessage: {0}" , message .toString ());
94- sender .sendObjectMessage (message );
95- subscriber .onNext (new Object ());
96- subscriber .onCompleted ();
97- } catch (Exception e ) {
98- subscriber .onError (e );
99- }
51+ public Boolean call () throws Exception {
52+ logger .log (Level .FINE , "sendStringMessage: {0}" , message );
53+ return sender .send (message );
10054 }
10155 });
10256 }
10357
104-
10558 /**
106- * Transformer that convert Object message to observable that returns if message was sent
107- *
59+ * Send object
60+ * <p>
10861 * Object is parsed via {@link ObjectSerializer} given by
10962 * {@link RxObjectWebSockets#RxObjectWebSockets(RxWebSockets, ObjectSerializer)}
11063 *
111- * @param connection connection event that is used to send message
112- * @return Observable that returns {@link Observer#onNext(Object)} with new Object()
113- * and {@link Observer#onCompleted()} or {@link Observer#onError(Throwable)}
114- *
115- * @see #sendMessage(RxEventConn )
64+ * @param sender connection event that is used to send message
65+ * @param message message to serialize and sent
66+ * @return Single that returns true if message was enqueued or ObjectParseException if couldn't
67+ * serialize
68+ * @see #sendMessage(WebSocket, String )
11669 */
117- @ SuppressWarnings ("unused" )
11870 @ Nonnull
119- public static Observable . Transformer < Object , Object > sendMessage ( @ Nonnull final RxObjectEventConn connection ) {
120- return new Observable . Transformer < Object , Object >() {
71+ public static Single < Boolean > sendObjectMessage ( final @ Nonnull ObjectWebSocketSender sender , final @ Nonnull Object message ) {
72+ return Single . fromCallable ( new Callable < Boolean >() {
12173 @ Override
122- public Observable <Object > call (Observable <Object > stringObservable ) {
123- return stringObservable .flatMap (new Func1 <Object , Observable <?>>() {
124- @ Override
125- public Observable <?> call (Object message ) {
126- return sendMessage (connection .sender (), message );
127- }
128- });
74+ public Boolean call () throws Exception {
75+ logger .log (Level .FINE , "sendStringMessage: {0}" , message );
76+ return sender .sendObjectMessage (message );
12977 }
130- };
78+ }) ;
13179 }
80+
81+
13282}
0 commit comments