88import static io .scalecube .services .Reflect .requestType ;
99import static io .scalecube .services .Reflect .restMethod ;
1010import static io .scalecube .services .Reflect .serviceName ;
11+ import static io .scalecube .services .api .ServiceMessage .HEADER_PROPAGATE_DATA_TYPE_HEADER ;
1112import static io .scalecube .services .auth .Principal .NULL_PRINCIPAL ;
1213
1314import io .scalecube .services .api .ErrorData ;
2122import io .scalecube .services .routing .Router ;
2223import io .scalecube .services .routing .Routers ;
2324import io .scalecube .services .transport .api .ClientTransport ;
25+ import io .scalecube .services .transport .api .ServiceMessageDataDecoder ;
2426import java .lang .reflect .Method ;
2527import java .lang .reflect .Proxy ;
2628import java .lang .reflect .Type ;
2931import java .util .Map ;
3032import java .util .Objects ;
3133import java .util .Optional ;
34+ import java .util .concurrent .ConcurrentHashMap ;
3235import java .util .function .Function ;
3336import org .reactivestreams .Publisher ;
3437import reactor .core .Exceptions ;
@@ -44,8 +47,9 @@ public class ServiceCall implements AutoCloseable {
4447 private ServiceClientErrorMapper errorMapper = DefaultErrorMapper .INSTANCE ;
4548 private Map <String , String > credentials = Collections .emptyMap ();
4649 private String contentType = ServiceMessage .DEFAULT_DATA_FORMAT ;
50+ private ServiceMessageDataDecoder dataDecoder = ServiceMessageDataDecoder .INSTANCE ;
4751
48- // private Logger logger ;
52+ private final Map < String , Type > resolvedTypes = new ConcurrentHashMap <>() ;
4953
5054 public ServiceCall () {}
5155
@@ -55,6 +59,7 @@ private ServiceCall(ServiceCall other) {
5559 this .router = other .router ;
5660 this .errorMapper = other .errorMapper ;
5761 this .contentType = other .contentType ;
62+ this .dataDecoder = other .dataDecoder ;
5863 this .credentials = Collections .unmodifiableMap (new HashMap <>(other .credentials ));
5964 }
6065
@@ -142,6 +147,18 @@ public ServiceCall contentType(String contentType) {
142147 return target ;
143148 }
144149
150+ /**
151+ * Setter for {@code dataDecoder}.
152+ *
153+ * @param dataDecoder dataDecoder.
154+ * @return new {@link ServiceCall} instance.
155+ */
156+ public ServiceCall dataDecoder (ServiceMessageDataDecoder dataDecoder ) {
157+ ServiceCall target = new ServiceCall (this );
158+ target .dataDecoder = dataDecoder ;
159+ return target ;
160+ }
161+
145162 /**
146163 * Invokes fire-and-forget request.
147164 *
@@ -178,7 +195,7 @@ public Mono<ServiceMessage> requestOne(ServiceMessage request, Type responseType
178195 // local service
179196 return methodInvoker
180197 .invokeOne (request )
181- .map (this :: throwIfError )
198+ .map (message -> onMessage ( message , responseType ) )
182199 .contextWrite (
183200 context -> {
184201 if (context .hasKey (RequestContext .class )) {
@@ -198,8 +215,8 @@ public Mono<ServiceMessage> requestOne(ServiceMessage request, Type responseType
198215 serviceReference ->
199216 transport
200217 .create (serviceReference )
201- .requestResponse (request , responseType )
202- .map (this :: throwIfError ));
218+ .requestResponse (request )
219+ .map (message -> onMessage ( message , responseType ) ));
203220 }
204221 });
205222 }
@@ -230,7 +247,7 @@ public Flux<ServiceMessage> requestMany(ServiceMessage request, Type responseTyp
230247 // local service
231248 return methodInvoker
232249 .invokeMany (request )
233- .map (this :: throwIfError )
250+ .map (message -> onMessage ( message , responseType ) )
234251 .contextWrite (
235252 context -> {
236253 if (context .hasKey (RequestContext .class )) {
@@ -250,8 +267,8 @@ public Flux<ServiceMessage> requestMany(ServiceMessage request, Type responseTyp
250267 serviceReference ->
251268 transport
252269 .create (serviceReference )
253- .requestStream (request , responseType )
254- .map (this :: throwIfError ));
270+ .requestStream (request )
271+ .map (message -> onMessage ( message , responseType ) ));
255272 }
256273 });
257274 }
@@ -286,7 +303,7 @@ public Flux<ServiceMessage> requestBidirectional(
286303 // local service
287304 return methodInvoker
288305 .invokeBidirectional (messages )
289- .map (this :: throwIfError )
306+ .map (message -> onMessage ( message , responseType ) )
290307 .contextWrite (
291308 context -> {
292309 if (context .hasKey (RequestContext .class )) {
@@ -306,8 +323,8 @@ public Flux<ServiceMessage> requestBidirectional(
306323 serviceReference ->
307324 transport
308325 .create (serviceReference )
309- .requestChannel (messages , responseType )
310- .map (this :: throwIfError ));
326+ .requestChannel (messages )
327+ .map (message -> onMessage ( message , responseType ) ));
311328 }
312329 }
313330 return messages ;
@@ -354,10 +371,9 @@ public <T> T api(Class<T> serviceInterface) {
354371 case REQUEST_CHANNEL :
355372 // this is REQUEST_CHANNEL so it means params[0] must
356373 // be a publisher - its safe to cast.
357- //noinspection rawtypes
358374 return serviceCall
359375 .requestBidirectional (
360- Flux .from ((Publisher ) request )
376+ Flux .from ((Publisher <?> ) request )
361377 .map (data -> toServiceMessage (methodInfo , data )),
362378 returnType )
363379 .transform (asFlux (isReturnTypeServiceMessage ));
@@ -376,18 +392,15 @@ private ServiceReference serviceLookup(ServiceMessage request) {
376392 }
377393
378394 private ServiceMessage toServiceMessage (MethodInfo methodInfo , Object request ) {
379- if (request instanceof ServiceMessage ) {
380- return ServiceMessage .from ((ServiceMessage ) request )
381- .qualifier (methodInfo .serviceName (), methodInfo .methodName ())
382- .headers (credentials )
383- .dataFormatIfAbsent (contentType )
384- .build ();
385- }
395+ final var builder =
396+ request instanceof ServiceMessage
397+ ? ServiceMessage .from ((ServiceMessage ) request )
398+ : ServiceMessage .builder ().data (request );
386399
387- return ServiceMessage . builder ()
400+ return builder
388401 .qualifier (methodInfo .serviceName (), methodInfo .methodName ())
389402 .headers (credentials )
390- .data ( request )
403+ .header ( HEADER_PROPAGATE_DATA_TYPE_HEADER , true )
391404 .dataFormatIfAbsent (contentType )
392405 .build ();
393406 }
@@ -438,13 +451,6 @@ private static Function<Mono<ServiceMessage>, Mono<Object>> asMono(
438451 : mono .mapNotNull (ServiceMessage ::data );
439452 }
440453
441- private ServiceMessage throwIfError (ServiceMessage message ) {
442- if (message .isError () && message .hasData (ErrorData .class )) {
443- throw Exceptions .propagate (errorMapper .toError (message ));
444- }
445- return message ;
446- }
447-
448454 private static MethodInfo getMethodInfo (Class <?> serviceInterface , Method method ) {
449455 return new MethodInfo (
450456 serviceName (serviceInterface ),
@@ -461,6 +467,35 @@ private static MethodInfo getMethodInfo(Class<?> serviceInterface, Method method
461467 Collections .emptyList ());
462468 }
463469
470+ private ServiceMessage onMessage (ServiceMessage message , Type returnType ) {
471+ if (returnType == null ) {
472+ return message ;
473+ }
474+
475+ if (message .isError ()) {
476+ throw Exceptions .propagate (
477+ errorMapper .toError (dataDecoder .decodeData (message , ErrorData .class )));
478+ }
479+
480+ return dataDecoder .decodeData (
481+ message , TypeUtil .isWildcardType (returnType ) ? getDataType (message ) : returnType );
482+ }
483+
484+ private Type getDataType (ServiceMessage message ) {
485+ final var dataType = message .header (ServiceMessage .HEADER_DATA_TYPE );
486+
487+ if (dataType == null ) {
488+ return Object .class ;
489+ }
490+
491+ return resolvedTypes .computeIfAbsent (
492+ dataType ,
493+ s -> {
494+ final var type = TypeUtil .parseTypeDescriptor (dataType );
495+ return type != null ? type : Object .class ;
496+ });
497+ }
498+
464499 @ Override
465500 public void close () {
466501 if (transport != null ) {
0 commit comments