Skip to content

Commit 30d556b

Browse files
committed
Support for LoadbalanceRSocketClient
Closes gh-25333
1 parent 71ecca7 commit 30d556b

File tree

2 files changed

+62
-15
lines changed

2 files changed

+62
-15
lines changed

spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,15 @@
2828
import io.rsocket.core.RSocketClient;
2929
import io.rsocket.core.RSocketConnector;
3030
import io.rsocket.frame.decoder.PayloadDecoder;
31+
import io.rsocket.loadbalance.LoadbalanceRSocketClient;
32+
import io.rsocket.loadbalance.LoadbalanceStrategy;
33+
import io.rsocket.loadbalance.LoadbalanceTarget;
3134
import io.rsocket.metadata.WellKnownMimeType;
3235
import io.rsocket.transport.ClientTransport;
3336
import io.rsocket.transport.netty.client.TcpClientTransport;
3437
import io.rsocket.transport.netty.client.WebsocketClientTransport;
3538
import io.rsocket.util.DefaultPayload;
39+
import org.reactivestreams.Publisher;
3640
import reactor.core.publisher.Mono;
3741

3842
import org.springframework.core.ReactiveAdapter;
@@ -171,6 +175,25 @@ public RSocketRequester transport(ClientTransport transport) {
171175
return new DefaultRSocketRequester(client, null, dataMimeType, metaMimeType, strategies);
172176
}
173177

178+
@Override
179+
public RSocketRequester transports(
180+
Publisher<List<LoadbalanceTarget>> targetPublisher, LoadbalanceStrategy loadbalanceStrategy) {
181+
182+
RSocketStrategies strategies = getRSocketStrategies();
183+
MimeType metaMimeType = getMetadataMimeType();
184+
MimeType dataMimeType = getDataMimeType(strategies);
185+
186+
RSocketConnector connector = initConnector(
187+
this.rsocketConnectorConfigurers, metaMimeType, dataMimeType, strategies);
188+
189+
LoadbalanceRSocketClient client = LoadbalanceRSocketClient.builder(targetPublisher)
190+
.connector(connector)
191+
.loadbalanceStrategy(loadbalanceStrategy)
192+
.build();
193+
194+
return new DefaultRSocketRequester(client, null, dataMimeType, metaMimeType, strategies);
195+
}
196+
174197
@Override
175198
@SuppressWarnings("deprecation")
176199
public Mono<RSocketRequester> connectTcp(String host, int port) {

spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
package org.springframework.messaging.rsocket;
1818

1919
import java.net.URI;
20+
import java.util.List;
2021
import java.util.function.Consumer;
2122

2223
import io.rsocket.ConnectionSetupPayload;
2324
import io.rsocket.Payload;
2425
import io.rsocket.RSocket;
2526
import io.rsocket.core.RSocketClient;
27+
import io.rsocket.loadbalance.LoadbalanceStrategy;
28+
import io.rsocket.loadbalance.LoadbalanceTarget;
2629
import io.rsocket.transport.ClientTransport;
2730
import io.rsocket.transport.netty.client.TcpClientTransport;
2831
import io.rsocket.transport.netty.client.WebsocketClientTransport;
@@ -116,8 +119,9 @@ static RSocketRequester.Builder builder() {
116119
}
117120

118121
/**
119-
* Wrap an existing {@link RSocket}. Typically used in client or server
120-
* responders to wrap the {@code RSocket} for the remote side.
122+
* Wrap an existing {@link RSocket}. Typically for internal framework use,
123+
* to wrap the remote {@code RSocket} in a client or server responder, but
124+
* it can also be used to wrap any {@link RSocket}.
121125
*/
122126
static RSocketRequester wrap(
123127
RSocket rsocket, MimeType dataMimeType, MimeType metadataMimeType,
@@ -224,36 +228,56 @@ interface Builder {
224228
RSocketRequester.Builder apply(Consumer<RSocketRequester.Builder> configurer);
225229

226230
/**
227-
* Build an {@link RSocketRequester} instance for use with a TCP
228-
* transport. Requests are made via {@link io.rsocket.core.RSocketClient}
229-
* which establishes a shared TCP connection to given host and port.
230-
* @param host the host of the server to connect to
231-
* @param port the port of the server to connect to
231+
* Build an {@link RSocketRequester} with an
232+
* {@link io.rsocket.core.RSocketClient} that connects over TCP to the
233+
* given host and port. The requester can be used to make requests
234+
* concurrently. Requests are made over a shared connection that is also
235+
* re-established as needed when further requests are made.
236+
* @param host the host to connect to
237+
* @param port the port to connect to
232238
* @return the created {@code RSocketRequester}
233239
* @since 5.3
234240
*/
235241
RSocketRequester tcp(String host, int port);
236242

237243
/**
238-
* Build an {@link RSocketRequester} instance for use with a WebSocket
239-
* transport. Requests are made via {@link io.rsocket.core.RSocketClient}
240-
* which establishes a shared WebSocket connection to given URL.
241-
* @param uri the URL of the server to connect to
244+
* Build an {@link RSocketRequester} with an
245+
* {@link io.rsocket.core.RSocketClient} that connects over WebSocket to
246+
* the given URL. The requester can be used to make requests
247+
* concurrently. Requests are made over a shared connection that is also
248+
* re-established as needed when further requests are made.
249+
* @param uri the URL to connect to
242250
* @return the created {@code RSocketRequester}
243251
* @since 5.3
244252
*/
245253
RSocketRequester websocket(URI uri);
246254

247255
/**
248-
* Build an {@link RSocketRequester} instance for use with the given
249-
* transport. Requests are made via {@link io.rsocket.core.RSocketClient}
250-
* which establishes a shared connection through the given transport.
251-
* @param transport the transport to use for connecting to the server
256+
* Variant of {@link #tcp(String, int)} and {@link #websocket(URI)}
257+
* with an already initialized {@link ClientTransport}.
258+
* @param transport the transport to connect with
252259
* @return the created {@code RSocketRequester}
253260
* @since 5.3
254261
*/
255262
RSocketRequester transport(ClientTransport transport);
256263

264+
/**
265+
* Build an {@link RSocketRequester} with an
266+
* {@link io.rsocket.loadbalance.LoadbalanceRSocketClient} that will
267+
* connect to one of the given targets selected through the given
268+
* {@link io.rsocket.loadbalance.LoadbalanceRSocketClient}.
269+
* @param targetPublisher a {@code Publisher} that supplies a list of
270+
* target transports to loadbalance against; the given list may be
271+
* periodically updated by the {@code Publisher}.
272+
* @param loadbalanceStrategy the strategy to use for selecting from
273+
* the list of loadbalance targets.
274+
* @return the created {@code RSocketRequester}
275+
* @since 5.3
276+
*/
277+
RSocketRequester transports(
278+
Publisher<List<LoadbalanceTarget>> targetPublisher,
279+
LoadbalanceStrategy loadbalanceStrategy);
280+
257281
/**
258282
* Connect to the server over TCP.
259283
* @param host the server host

0 commit comments

Comments
 (0)