55import io .scalecube .net .Address ;
66import io .scalecube .services .api .ErrorData ;
77import io .scalecube .services .api .ServiceMessage ;
8- import io .scalecube .services .exceptions .DefaultErrorMapper ;
98import io .scalecube .services .exceptions .ServiceClientErrorMapper ;
109import io .scalecube .services .exceptions .ServiceUnavailableException ;
1110import io .scalecube .services .methods .MethodInfo ;
2221import java .util .HashMap ;
2322import java .util .Map ;
2423import java .util .Optional ;
25- import java .util .concurrent .Callable ;
26- import java .util .function .Consumer ;
2724import java .util .function .Function ;
2825import org .reactivestreams .Publisher ;
2926import org .slf4j .Logger ;
@@ -39,66 +36,33 @@ public class ServiceCall {
3936 private static final ServiceMessage UNEXPECTED_EMPTY_RESPONSE =
4037 ServiceMessage .error (503 , 503 , "Unexpected empty response" );
4138
42- private ClientTransport transport ;
43- private ServiceMethodRegistry methodRegistry ;
44- private ServiceRegistry serviceRegistry ;
39+ private final ClientTransport transport ;
40+ private final ServiceMethodRegistry methodRegistry ;
41+ private final ServiceRegistry serviceRegistry ;
42+
4543 private Router router ;
46- private ServiceClientErrorMapper errorMapper = DefaultErrorMapper .INSTANCE ;
47- private Consumer <Object > requestReleaser =
48- req -> {
49- // no-op
50- };
44+ private ServiceClientErrorMapper errorMapper ;
5145 private Map <String , String > credentials = Collections .emptyMap ();
5246 private String contentType ;
5347
54- /** Default constructor. */
55- public ServiceCall () {}
48+ ServiceCall (
49+ ClientTransport transport ,
50+ ServiceMethodRegistry methodRegistry ,
51+ ServiceRegistry serviceRegistry ) {
52+ this .transport = transport ;
53+ this .methodRegistry = methodRegistry ;
54+ this .serviceRegistry = serviceRegistry ;
55+ }
5656
5757 private ServiceCall (ServiceCall other ) {
5858 this .transport = other .transport ;
5959 this .methodRegistry = other .methodRegistry ;
6060 this .serviceRegistry = other .serviceRegistry ;
61+ // Set resettable fields
6162 this .router = other .router ;
6263 this .errorMapper = other .errorMapper ;
6364 this .contentType = other .contentType ;
64- this .requestReleaser = other .requestReleaser ;
65- this .credentials = new HashMap <>(other .credentials );
66- }
67-
68- /**
69- * Creates new {@link ServiceCall}'s definition with a given client transport.
70- *
71- * @param clientTransport client transport.
72- * @return new {@link ServiceCall} instance.
73- */
74- public ServiceCall transport (ClientTransport clientTransport ) {
75- ServiceCall target = new ServiceCall (this );
76- target .transport = clientTransport ;
77- return target ;
78- }
79-
80- /**
81- * Creates new {@link ServiceCall}'s definition with a given service registry.
82- *
83- * @param serviceRegistry service registry.
84- * @return new {@link ServiceCall} instance.
85- */
86- public ServiceCall serviceRegistry (ServiceRegistry serviceRegistry ) {
87- ServiceCall target = new ServiceCall (this );
88- target .serviceRegistry = serviceRegistry ;
89- return target ;
90- }
91-
92- /**
93- * Creates new {@link ServiceCall}'s definition with a given method registry.
94- *
95- * @param methodRegistry method registry.
96- * @return new {@link ServiceCall} instance.
97- */
98- public ServiceCall methodRegistry (ServiceMethodRegistry methodRegistry ) {
99- ServiceCall target = new ServiceCall (this );
100- target .methodRegistry = methodRegistry ;
101- return target ;
65+ this .credentials = Collections .unmodifiableMap (new HashMap <>(other .credentials ));
10266 }
10367
10468 /**
@@ -137,18 +101,6 @@ public ServiceCall errorMapper(ServiceClientErrorMapper errorMapper) {
137101 return target ;
138102 }
139103
140- /**
141- * Creates new {@link ServiceCall}'s definition with a given requestReleaser.
142- *
143- * @param requestReleaser given.
144- * @return new {@link ServiceCall} instance.
145- */
146- public ServiceCall requestReleaser (Consumer <Object > requestReleaser ) {
147- ServiceCall target = new ServiceCall (this );
148- target .requestReleaser = requestReleaser ;
149- return target ;
150- }
151-
152104 /**
153105 * Creates new {@link ServiceCall}'s definition with a given credentials.
154106 *
@@ -219,11 +171,12 @@ public Mono<ServiceMessage> requestOne(ServiceMessage request, Type responseType
219171 && methodRegistry .containsInvoker (qualifier )) { // local service
220172 return methodRegistry
221173 .getInvoker (request .qualifier ())
222- .invokeOne (request , requestReleaser )
174+ .invokeOne (request )
223175 .map (this ::throwIfError );
224176 } else {
225- return addressLookup (request )
226- .flatMap (address -> requestOne (request , responseType , address )); // remote service
177+ // remote service
178+ return Mono .fromCallable (() -> addressLookup (request ))
179+ .flatMap (address -> requestOne (request , responseType , address ));
227180 }
228181 });
229182 }
@@ -274,12 +227,13 @@ public Flux<ServiceMessage> requestMany(ServiceMessage request, Type responseTyp
274227 && methodRegistry .containsInvoker (qualifier )) { // local service
275228 return methodRegistry
276229 .getInvoker (request .qualifier ())
277- .invokeMany (request , requestReleaser )
230+ .invokeMany (request )
278231 .map (this ::throwIfError );
279232 } else {
280- return addressLookup (request )
233+ // remote service
234+ return Mono .fromCallable (() -> addressLookup (request ))
281235 .flatMapMany (
282- address -> requestMany (request , responseType , address )); // remote service
236+ address -> requestMany (request , responseType , address ));
283237 }
284238 });
285239 }
@@ -335,11 +289,11 @@ public Flux<ServiceMessage> requestBidirectional(
335289 && methodRegistry .containsInvoker (qualifier )) { // local service
336290 return methodRegistry
337291 .getInvoker (qualifier )
338- .invokeBidirectional (messages , requestReleaser )
292+ .invokeBidirectional (messages )
339293 .map (this ::throwIfError );
340294 } else {
341295 // remote service
342- return addressLookup (request )
296+ return Mono . fromCallable (() -> addressLookup (request ) )
343297 .flatMapMany (
344298 address -> requestBidirectional (messages , responseType , address ));
345299 }
@@ -437,14 +391,11 @@ public Object invoke(Object proxy, Method method, Object[] params) {
437391 });
438392 }
439393
440- private Mono <Address > addressLookup (ServiceMessage request ) {
441- Callable <Address > callable =
442- () ->
443- router
444- .route (serviceRegistry , request )
445- .map (ServiceReference ::address )
446- .orElseThrow (() -> noReachableMemberException (request ));
447- return Mono .fromCallable (callable ).doOnError (th -> applyRequestReleaser (request ));
394+ private Address addressLookup (ServiceMessage request ) {
395+ return router
396+ .route (serviceRegistry , request )
397+ .map (ServiceReference ::address )
398+ .orElseThrow (() -> noReachableMemberException (request ));
448399 }
449400
450401 private ServiceMessage toServiceMessage (MethodInfo methodInfo , Object request ) {
@@ -517,10 +468,4 @@ private ServiceMessage throwIfError(ServiceMessage message) {
517468 }
518469 return message ;
519470 }
520-
521- private void applyRequestReleaser (ServiceMessage request ) {
522- if (request .data () != null ) {
523- requestReleaser .accept (request .data ());
524- }
525- }
526471}
0 commit comments