11package io .scalecube .services ;
22
3- import static java .util .Objects .requireNonNull ;
4-
5- import io .scalecube .net .Address ;
63import io .scalecube .services .api .ErrorData ;
74import io .scalecube .services .api .ServiceMessage ;
85import io .scalecube .services .exceptions .DefaultErrorMapper ;
@@ -162,17 +159,6 @@ public Mono<Void> oneWay(ServiceMessage request) {
162159 return Mono .defer (() -> requestOne (request , Void .class ).then ());
163160 }
164161
165- /**
166- * Issues fire-and-forget request.
167- *
168- * @param request request message to send.
169- * @param address of remote target service to invoke.
170- * @return mono publisher completing normally or with error.
171- */
172- public Mono <Void > oneWay (ServiceMessage request , Address address ) {
173- return Mono .defer (() -> requestOne (request , Void .class , address ).then ());
174- }
175-
176162 /**
177163 * Issues request-and-reply request.
178164 *
@@ -187,48 +173,31 @@ public Mono<ServiceMessage> requestOne(ServiceMessage request) {
187173 * Issues request-and-reply request.
188174 *
189175 * @param request request message to send.
190- * @param responseType type of response.
176+ * @param responseType type of response (optional) .
191177 * @return mono publisher completing with single response message or with error.
192178 */
193179 public Mono <ServiceMessage > requestOne (ServiceMessage request , Type responseType ) {
194180 return Mono .defer (
195181 () -> {
196- Objects .requireNonNull (request .qualifier (), "qualifier" );
197-
198182 ServiceMethodInvoker methodInvoker ;
199183 if (methodRegistry != null
200184 && (methodInvoker = methodRegistry .getInvoker (request .qualifier ())) != null ) {
201185 // local service
202186 return methodInvoker .invokeOne (request ).map (this ::throwIfError );
203187 } else {
204188 // remote service
205- return Mono .fromCallable (() -> addressLookup (request ))
206- .flatMap (address -> requestOne (request , responseType , address ));
189+ Objects .requireNonNull (transport , "[requestOne] transport" );
190+ return Mono .fromCallable (() -> serviceLookup (request ))
191+ .flatMap (
192+ serviceReference ->
193+ transport
194+ .create (serviceReference )
195+ .requestResponse (request , responseType )
196+ .map (this ::throwIfError ));
207197 }
208198 });
209199 }
210200
211- /**
212- * Given an address issues request-and-reply request to a remote address.
213- *
214- * @param request request message to send.
215- * @param responseType type of response.
216- * @param address of remote target service to invoke.
217- * @return mono publisher completing with single response message or with error.
218- */
219- public Mono <ServiceMessage > requestOne (
220- ServiceMessage request , Type responseType , Address address ) {
221- return Mono .defer (
222- () -> {
223- requireNonNull (address , "requestOne address parameter is required and must not be null" );
224- requireNonNull (transport , "transport is required and must not be null" );
225- return transport
226- .create (address )
227- .requestResponse (request , responseType )
228- .map (this ::throwIfError );
229- });
230- }
231-
232201 /**
233202 * Issues request to service which returns stream of service messages back.
234203 *
@@ -243,49 +212,31 @@ public Flux<ServiceMessage> requestMany(ServiceMessage request) {
243212 * Issues request to service which returns stream of service messages back.
244213 *
245214 * @param request request with given headers.
246- * @param responseType type of responses.
215+ * @param responseType type of responses (optional) .
247216 * @return flux publisher of service responses.
248217 */
249218 public Flux <ServiceMessage > requestMany (ServiceMessage request , Type responseType ) {
250219 return Flux .defer (
251220 () -> {
252- Objects .requireNonNull (request .qualifier (), "qualifier" );
253-
254221 ServiceMethodInvoker methodInvoker ;
255222 if (methodRegistry != null
256223 && (methodInvoker = methodRegistry .getInvoker (request .qualifier ())) != null ) {
257224 // local service
258225 return methodInvoker .invokeMany (request ).map (this ::throwIfError );
259226 } else {
260227 // remote service
261- return Mono .fromCallable (() -> addressLookup (request ))
262- .flatMapMany (address -> requestMany (request , responseType , address ));
228+ Objects .requireNonNull (transport , "[requestMany] transport" );
229+ return Mono .fromCallable (() -> serviceLookup (request ))
230+ .flatMapMany (
231+ serviceReference ->
232+ transport
233+ .create (serviceReference )
234+ .requestStream (request , responseType )
235+ .map (this ::throwIfError ));
263236 }
264237 });
265238 }
266239
267- /**
268- * Given an address issues request to remote service which returns stream of service messages
269- * back.
270- *
271- * @param request request with given headers.
272- * @param responseType type of responses.
273- * @param address of remote target service to invoke.
274- * @return flux publisher of service responses.
275- */
276- public Flux <ServiceMessage > requestMany (
277- ServiceMessage request , Type responseType , Address address ) {
278- return Flux .defer (
279- () -> {
280- requireNonNull (address , "requestMany address parameter is required and must not be null" );
281- requireNonNull (transport , "transport is required and must not be null" );
282- return transport
283- .create (address )
284- .requestStream (request , responseType )
285- .map (this ::throwIfError );
286- });
287- }
288-
289240 /**
290241 * Issues stream of service requests to service which returns stream of service messages back.
291242 *
@@ -300,7 +251,7 @@ public Flux<ServiceMessage> requestBidirectional(Publisher<ServiceMessage> publi
300251 * Issues stream of service requests to service which returns stream of service messages back.
301252 *
302253 * @param publisher of service requests.
303- * @param responseType type of responses.
254+ * @param responseType type of responses (optional) .
304255 * @return flux publisher of service responses.
305256 */
306257 public Flux <ServiceMessage > requestBidirectional (
@@ -310,47 +261,27 @@ public Flux<ServiceMessage> requestBidirectional(
310261 (first , messages ) -> {
311262 if (first .hasValue ()) {
312263 ServiceMessage request = first .get ();
313- Objects .requireNonNull (request .qualifier (), "qualifier" );
314-
315264 ServiceMethodInvoker methodInvoker ;
316265 if (methodRegistry != null
317266 && (methodInvoker = methodRegistry .getInvoker (request .qualifier ())) != null ) {
318267 // local service
319268 return methodInvoker .invokeBidirectional (messages ).map (this ::throwIfError );
320269 } else {
321270 // remote service
322- return Mono .fromCallable (() -> addressLookup (request ))
271+ Objects .requireNonNull (transport , "[requestBidirectional] transport" );
272+ return Mono .fromCallable (() -> serviceLookup (request ))
323273 .flatMapMany (
324- address -> requestBidirectional (messages , responseType , address ));
274+ serviceReference ->
275+ transport
276+ .create (serviceReference )
277+ .requestChannel (messages , responseType )
278+ .map (this ::throwIfError ));
325279 }
326280 }
327281 return messages ;
328282 });
329283 }
330284
331- /**
332- * Given an address issues stream of service requests to service which returns stream of service
333- * messages back.
334- *
335- * @param publisher of service requests.
336- * @param responseType type of responses.
337- * @param address of remote target service to invoke.
338- * @return flux publisher of service responses.
339- */
340- public Flux <ServiceMessage > requestBidirectional (
341- Publisher <ServiceMessage > publisher , Type responseType , Address address ) {
342- return Flux .defer (
343- () -> {
344- requireNonNull (
345- address , "requestBidirectional address parameter is required and must not be null" );
346- requireNonNull (transport , "transport is required and must not be null" );
347- return transport
348- .create (address )
349- .requestChannel (publisher , responseType )
350- .map (this ::throwIfError );
351- });
352- }
353-
354285 /**
355286 * Create proxy creates a java generic proxy instance by a given service interface.
356287 *
@@ -416,10 +347,9 @@ public Object invoke(Object proxy, Method method, Object[] params) {
416347 });
417348 }
418349
419- private Address addressLookup (ServiceMessage request ) {
350+ private ServiceReference serviceLookup (ServiceMessage request ) {
420351 return router
421352 .route (serviceRegistry , request )
422- .map (ServiceReference ::address )
423353 .orElseThrow (() -> noReachableMemberException (request ));
424354 }
425355
0 commit comments