1
1
/*
2
- * Copyright 2002-2019 the original author or authors.
2
+ * Copyright 2002-2020 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
25
25
import java .util .function .Consumer ;
26
26
27
27
import io .rsocket .Payload ;
28
- import io .rsocket .RSocketFactory ;
29
28
import io .rsocket .frame .decoder .PayloadDecoder ;
30
29
import io .rsocket .metadata .WellKnownMimeType ;
31
30
import io .rsocket .transport .ClientTransport ;
32
31
import io .rsocket .transport .netty .client .TcpClientTransport ;
33
32
import io .rsocket .transport .netty .client .WebsocketClientTransport ;
33
+ import io .rsocket .util .DefaultPayload ;
34
34
import reactor .core .publisher .Mono ;
35
35
36
36
import org .springframework .core .ReactiveAdapter ;
43
43
import org .springframework .core .io .buffer .NettyDataBufferFactory ;
44
44
import org .springframework .lang .Nullable ;
45
45
import org .springframework .util .Assert ;
46
+ import org .springframework .util .ClassUtils ;
46
47
import org .springframework .util .CollectionUtils ;
47
48
import org .springframework .util .MimeType ;
48
49
import org .springframework .util .MimeTypeUtils ;
56
57
*/
57
58
final class DefaultRSocketRequesterBuilder implements RSocketRequester .Builder {
58
59
60
+ private final static boolean rsocketConnectorPresent =
61
+ ClassUtils .isPresent ("io.rsocket.core.RSocketConnector" ,
62
+ DefaultRSocketRequesterBuilder .class .getClassLoader ());
63
+
64
+
59
65
private static final Map <String , Object > HINTS = Collections .emptyMap ();
60
66
61
67
private static final byte [] EMPTY_BYTE_ARRAY = new byte [0 ];
62
68
69
+ private static final Payload EMPTY_SETUP_PAYLOAD = DefaultPayload .create (EMPTY_BYTE_ARRAY );
70
+
63
71
64
72
@ Nullable
65
73
private MimeType dataMimeType ;
@@ -84,7 +92,10 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
84
92
85
93
private List <Consumer <RSocketStrategies .Builder >> strategiesConfigurers = new ArrayList <>();
86
94
87
- private List <ClientRSocketFactoryConfigurer > rsocketConfigurers = new ArrayList <>();
95
+ private List <RSocketConnectorConfigurer > rsocketConnectorConfigurers = new ArrayList <>();
96
+
97
+ @ SuppressWarnings ("deprecation" )
98
+ private List <ClientRSocketFactoryConfigurer > rsocketFactoryConfigurers = new ArrayList <>();
88
99
89
100
90
101
@ Override
@@ -133,8 +144,15 @@ public RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Bui
133
144
}
134
145
135
146
@ Override
147
+ public RSocketRequester .Builder rsocketConnector (RSocketConnectorConfigurer configurer ) {
148
+ this .rsocketConnectorConfigurers .add (configurer );
149
+ return this ;
150
+ }
151
+
152
+ @ Override
153
+ @ Deprecated
136
154
public RSocketRequester .Builder rsocketFactory (ClientRSocketFactoryConfigurer configurer ) {
137
- this .rsocketConfigurers .add (configurer );
155
+ this .rsocketFactoryConfigurers .add (configurer );
138
156
return this ;
139
157
}
140
158
@@ -164,28 +182,25 @@ private Mono<RSocketRequester> doConnect(ClientTransport transport) {
164
182
Assert .isTrue (!rsocketStrategies .encoders ().isEmpty (), "No encoders" );
165
183
Assert .isTrue (!rsocketStrategies .decoders ().isEmpty (), "No decoders" );
166
184
167
- RSocketFactory .ClientRSocketFactory factory = RSocketFactory .connect ();
168
- this .rsocketConfigurers .forEach (configurer -> configurer .configure (factory ));
169
-
170
- if (rsocketStrategies .dataBufferFactory () instanceof NettyDataBufferFactory ) {
171
- factory .frameDecoder (PayloadDecoder .ZERO_COPY );
172
- }
173
-
174
185
MimeType metaMimeType = this .metadataMimeType != null ? this .metadataMimeType :
175
186
MimeTypeUtils .parseMimeType (WellKnownMimeType .MESSAGE_RSOCKET_COMPOSITE_METADATA .getString ());
176
187
177
188
MimeType dataMimeType = getDataMimeType (rsocketStrategies );
178
- factory .dataMimeType (dataMimeType .toString ());
179
- factory .metadataMimeType (metaMimeType .toString ());
180
-
181
- return getSetupPayload (dataMimeType , metaMimeType , rsocketStrategies )
182
- .doOnNext (factory ::setupPayload )
183
- .then (Mono .defer (() ->
184
- factory .transport (transport )
185
- .start ()
186
- .map (rsocket -> new DefaultRSocketRequester (
187
- rsocket , dataMimeType , metaMimeType , rsocketStrategies ))
188
- ));
189
+
190
+ if (rsocketConnectorPresent ) {
191
+ return getSetupPayload (dataMimeType , metaMimeType , rsocketStrategies )
192
+ .flatMap (payload ->
193
+ new RSocketConnectorHelper ().connect (
194
+ this .rsocketConnectorConfigurers , this .rsocketFactoryConfigurers ,
195
+ metaMimeType , dataMimeType , payload , rsocketStrategies , transport ));
196
+ }
197
+ else {
198
+ return getSetupPayload (dataMimeType , metaMimeType , rsocketStrategies )
199
+ .flatMap (payload ->
200
+ new RSocketFactoryHelper ().connect (
201
+ this .rsocketFactoryConfigurers , metaMimeType , dataMimeType , payload ,
202
+ rsocketStrategies , transport ));
203
+ }
189
204
}
190
205
191
206
private RSocketStrategies getRSocketStrategies () {
@@ -234,7 +249,7 @@ private Mono<Payload> getSetupPayload(
234
249
Object data = this .setupData ;
235
250
boolean hasMetadata = (this .setupRoute != null || !CollectionUtils .isEmpty (this .setupMetadata ));
236
251
if (!hasMetadata && data == null ) {
237
- return Mono .empty ( );
252
+ return Mono .just ( EMPTY_SETUP_PAYLOAD );
238
253
}
239
254
240
255
Mono <DataBuffer > dataMono = Mono .empty ();
@@ -269,4 +284,69 @@ private Mono<Payload> getSetupPayload(
269
284
.doOnDiscard (Payload .class , Payload ::release );
270
285
}
271
286
287
+
288
+ private static class RSocketConnectorHelper {
289
+
290
+ @ SuppressWarnings ("deprecation" )
291
+ Mono <RSocketRequester > connect (
292
+ List <RSocketConnectorConfigurer > connectorConfigurers ,
293
+ List <ClientRSocketFactoryConfigurer > factoryConfigurers ,
294
+ MimeType metaMimeType , MimeType dataMimeType , Payload setupPayload ,
295
+ RSocketStrategies rsocketStrategies , ClientTransport transport ) {
296
+
297
+ io .rsocket .core .RSocketConnector connector = io .rsocket .core .RSocketConnector .create ();
298
+ connectorConfigurers .forEach (c -> c .configure (connector ));
299
+
300
+ if (!factoryConfigurers .isEmpty ()) {
301
+ io .rsocket .RSocketFactory .ClientRSocketFactory factory =
302
+ new io .rsocket .RSocketFactory .ClientRSocketFactory (connector );
303
+ factoryConfigurers .forEach (c -> c .configure (factory ));
304
+ }
305
+
306
+ if (rsocketStrategies .dataBufferFactory () instanceof NettyDataBufferFactory ) {
307
+ connector .payloadDecoder (PayloadDecoder .ZERO_COPY );
308
+ }
309
+
310
+ if (setupPayload != EMPTY_SETUP_PAYLOAD ) {
311
+ connector .setupPayload (setupPayload );
312
+ }
313
+
314
+ return connector
315
+ .metadataMimeType (metaMimeType .toString ())
316
+ .dataMimeType (dataMimeType .toString ())
317
+ .connect (transport )
318
+ .map (rsocket -> new DefaultRSocketRequester (
319
+ rsocket , dataMimeType , metaMimeType , rsocketStrategies ));
320
+ }
321
+ }
322
+
323
+
324
+ @ SuppressWarnings ("deprecation" )
325
+ private static class RSocketFactoryHelper {
326
+
327
+ Mono <RSocketRequester > connect (
328
+ List <ClientRSocketFactoryConfigurer > configurers ,
329
+ MimeType metaMimeType , MimeType dataMimeType , Payload setupPayload ,
330
+ RSocketStrategies rsocketStrategies , ClientTransport transport ) {
331
+
332
+ io .rsocket .RSocketFactory .ClientRSocketFactory factory = io .rsocket .RSocketFactory .connect ();
333
+ configurers .forEach (c -> c .configure (factory ));
334
+
335
+ if (rsocketStrategies .dataBufferFactory () instanceof NettyDataBufferFactory ) {
336
+ factory .frameDecoder (PayloadDecoder .ZERO_COPY );
337
+ }
338
+
339
+ if (setupPayload != EMPTY_SETUP_PAYLOAD ) {
340
+ factory .setupPayload (setupPayload );
341
+ }
342
+
343
+ return factory .metadataMimeType (metaMimeType .toString ())
344
+ .dataMimeType (dataMimeType .toString ())
345
+ .transport (transport )
346
+ .start ()
347
+ .map (rsocket -> new DefaultRSocketRequester (
348
+ rsocket , dataMimeType , metaMimeType , rsocketStrategies ));
349
+ }
350
+ }
351
+
272
352
}
0 commit comments