Skip to content

Commit ca8974f

Browse files
committed
Add functionality for custom codecs installation
1 parent f174477 commit ca8974f

File tree

8 files changed

+147
-20
lines changed

8 files changed

+147
-20
lines changed

services-api/src/main/java/io/scalecube/services/ServiceCall.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class ServiceCall {
4848
// no-op
4949
};
5050
private Map<String, String> credentials = Collections.emptyMap();
51+
private String contentType;
5152

5253
/** Default constructor. */
5354
public ServiceCall() {}
@@ -58,6 +59,7 @@ private ServiceCall(ServiceCall other) {
5859
this.serviceRegistry = other.serviceRegistry;
5960
this.router = other.router;
6061
this.errorMapper = other.errorMapper;
62+
this.contentType = other.contentType;
6163
}
6264

6365
/**
@@ -156,6 +158,18 @@ public ServiceCall credentials(Map<String, String> credentials) {
156158
return target;
157159
}
158160

161+
/**
162+
* Creates new {@link ServiceCall}'s definition with a given content type.
163+
*
164+
* @param contentType content type.
165+
* @return new {@link ServiceCall} instance.
166+
*/
167+
public ServiceCall contentType(String contentType) {
168+
ServiceCall target = new ServiceCall(this);
169+
target.contentType = contentType;
170+
return target;
171+
}
172+
159173
/**
160174
* Issues fire-and-forget request.
161175
*
@@ -434,13 +448,15 @@ private ServiceMessage toServiceMessage(MethodInfo methodInfo, Object request) {
434448
return ServiceMessage.from((ServiceMessage) request)
435449
.qualifier(methodInfo.serviceName(), methodInfo.methodName())
436450
.headers(credentials)
451+
.dataFormatIfAbsent(contentType)
437452
.build();
438453
}
439454

440455
return ServiceMessage.builder()
441456
.qualifier(methodInfo.serviceName(), methodInfo.methodName())
442457
.headers(credentials)
443458
.data(request)
459+
.dataFormatIfAbsent(contentType)
444460
.build();
445461
}
446462

services-api/src/main/java/io/scalecube/services/api/ServiceMessage.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,20 @@ public Builder dataFormat(String dataFormat) {
213213
return this;
214214
}
215215

216+
/**
217+
* Sets a given data format.
218+
*
219+
* @param dataFormat data format, can be optional
220+
* @return self
221+
*/
222+
public Builder dataFormatIfAbsent(String dataFormat) {
223+
if (dataFormat == null) {
224+
return this;
225+
}
226+
headers.putIfAbsent(HEADER_DATA_FORMAT, dataFormat);
227+
return this;
228+
}
229+
216230
private Map<String, String> headers() {
217231
return this.headers;
218232
}

services-api/src/main/java/io/scalecube/services/methods/ServiceMethodInvoker.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public Mono<ServiceMessage> invokeOne(ServiceMessage message, Consumer<Object> r
6767
return authenticate(message)
6868
.doOnError(th -> applyRequestReleaser(message, requestReleaser))
6969
.flatMap(principal -> Mono.from(invoke(toRequest(message), principal)))
70-
.map(this::toResponse)
70+
.map(response -> toResponse(response, message.dataFormat()))
7171
.onErrorResume(throwable -> Mono.just(errorMapper.toMessage(throwable)));
7272
}
7373

@@ -82,7 +82,7 @@ public Flux<ServiceMessage> invokeMany(ServiceMessage message, Consumer<Object>
8282
return authenticate(message)
8383
.doOnError(th -> applyRequestReleaser(message, requestReleaser))
8484
.flatMapMany(principal -> Flux.from(invoke(toRequest(message), principal)))
85-
.map(this::toResponse)
85+
.map(response -> toResponse(response, message.dataFormat()))
8686
.onErrorResume(throwable -> Flux.just(errorMapper.toMessage(throwable)));
8787
}
8888

@@ -104,8 +104,8 @@ public Flux<ServiceMessage> invokeBidirectional(
104104
principal ->
105105
messages
106106
.map(this::toRequest)
107-
.transform(request -> invoke(request, principal))))
108-
.map(this::toResponse)
107+
.transform(request -> invoke(request, principal)))
108+
.map(response -> toResponse(response, first.get().dataFormat())))
109109
.onErrorResume(throwable -> Flux.just(errorMapper.toMessage(throwable)));
110110
}
111111

@@ -187,10 +187,19 @@ private Object toRequest(ServiceMessage message) {
187187
return methodInfo.isRequestTypeServiceMessage() ? request : request.data();
188188
}
189189

190-
private ServiceMessage toResponse(Object response) {
191-
return (response instanceof ServiceMessage)
192-
? (ServiceMessage) response
193-
: ServiceMessage.builder().qualifier(methodInfo.qualifier()).data(response).build();
190+
private ServiceMessage toResponse(Object response, String dataFormat) {
191+
if (response instanceof ServiceMessage) {
192+
ServiceMessage message = (ServiceMessage) response;
193+
if (dataFormat != null && !dataFormat.equals(message.dataFormat())) {
194+
return ServiceMessage.from(message).dataFormat(dataFormat).build();
195+
}
196+
return message;
197+
}
198+
return ServiceMessage.builder()
199+
.qualifier(methodInfo.qualifier())
200+
.data(response)
201+
.dataFormatIfAbsent(dataFormat)
202+
.build();
194203
}
195204

196205
private void applyRequestReleaser(ServiceMessage request, Consumer<Object> requestReleaser) {

services-examples-parent/services-examples-runner/src/main/java/io/scalecube/services/examples/ExamplesRunner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.scalecube.services.ServiceEndpoint;
1212
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
1313
import io.scalecube.services.discovery.api.ServiceDiscovery;
14+
import io.scalecube.services.transport.api.HeadersCodec;
1415
import io.scalecube.services.transport.rsocket.RSocketServiceTransport;
1516
import java.nio.file.Path;
1617
import java.util.List;
@@ -69,7 +70,8 @@ public static void main(String[] args) {
6970
.wiretap(false)
7071
.port(config.servicePort())
7172
.runOn(loopResources)
72-
.noSSL()))
73+
.noSSL())
74+
.headersCodec(HeadersCodec.getInstance("application/json")))
7375
.services(new BenchmarkServiceImpl(), new GreetingServiceImpl())
7476
.startAwait()
7577
.onShutdown()
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package io.scalecube.services.examples.codecs;
2+
3+
import io.scalecube.net.Address;
4+
import io.scalecube.services.Microservices;
5+
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
6+
import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl;
7+
import io.scalecube.services.examples.helloworld.service.api.GreetingsService;
8+
import io.scalecube.services.transport.api.DataCodec;
9+
import io.scalecube.services.transport.api.HeadersCodec;
10+
import io.scalecube.services.transport.rsocket.RSocketServiceTransport;
11+
12+
public class Example1 {
13+
14+
public static final String CONTENT_TYPE = "application/protostuff";
15+
private static final HeadersCodec HEADERS_CODEC = HeadersCodec.getInstance(CONTENT_TYPE);
16+
private static final DataCodec DATA_CODEC = DataCodec.getInstance(CONTENT_TYPE);
17+
18+
/**
19+
* Start the example.
20+
*
21+
* @param args ignored
22+
*/
23+
public static void main(String[] args) {
24+
// ScaleCube Node node with no members
25+
Microservices seed =
26+
Microservices.builder()
27+
.discovery(ScalecubeServiceDiscovery::new)
28+
.transport(() -> new RSocketServiceTransport().headersCodec(HEADERS_CODEC))
29+
.defaultDataEncoder(DATA_CODEC) // need to send with non-default data format
30+
.startAwait();
31+
32+
final Address seedAddress = seed.discovery().address();
33+
34+
// Construct a ScaleCube node which joins the cluster hosting the Greeting Service
35+
Microservices ms =
36+
Microservices.builder()
37+
.discovery(
38+
endpoint ->
39+
new ScalecubeServiceDiscovery(endpoint)
40+
.membership(cfg -> cfg.seedMembers(seedAddress)))
41+
.transport(() -> new RSocketServiceTransport().headersCodec(HEADERS_CODEC))
42+
.services(new GreetingServiceImpl())
43+
.startAwait();
44+
45+
// Create service proxy
46+
GreetingsService service = seed.call().api(GreetingsService.class);
47+
48+
// Execute the services and subscribe to service events
49+
service.sayHello("joe").subscribe(consumer -> System.out.println(consumer.message()));
50+
51+
seed.onShutdown().block();
52+
ms.onShutdown().block();
53+
}
54+
}

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88
import io.netty.util.concurrent.Future;
99
import io.scalecube.services.api.ServiceMessage;
1010
import io.scalecube.services.transport.api.ClientTransport;
11+
import io.scalecube.services.transport.api.DataCodec;
1112
import io.scalecube.services.transport.api.HeadersCodec;
1213
import io.scalecube.services.transport.api.ReferenceCountUtil;
1314
import io.scalecube.services.transport.api.ServerTransport;
1415
import io.scalecube.services.transport.api.ServiceMessageCodec;
1516
import io.scalecube.services.transport.api.ServiceTransport;
1617
import java.net.InetSocketAddress;
18+
import java.util.Collection;
1719
import java.util.concurrent.ThreadFactory;
1820
import java.util.function.Function;
1921
import reactor.core.publisher.Flux;
@@ -27,7 +29,6 @@
2729
/** RSocket service transport. */
2830
public class RSocketServiceTransport implements ServiceTransport {
2931

30-
private static final HeadersCodec HEADERS_CODEC = HeadersCodec.getInstance("application/json");
3132
private static final int NUM_OF_WORKERS = Runtime.getRuntime().availableProcessors();
3233

3334
static {
@@ -38,7 +39,8 @@ public class RSocketServiceTransport implements ServiceTransport {
3839
}
3940

4041
private int numOfWorkers = NUM_OF_WORKERS;
41-
private HeadersCodec headersCodec = HEADERS_CODEC;
42+
private HeadersCodec headersCodec;
43+
private Collection<DataCodec> dataCodecs;
4244
private Function<LoopResources, TcpServer> tcpServerProvider = defaultTcpServerProvider();
4345
private Function<LoopResources, TcpClient> tcpClientProvider = defaultTcpClientProvider();
4446

@@ -58,6 +60,7 @@ public RSocketServiceTransport() {}
5860
private RSocketServiceTransport(RSocketServiceTransport other) {
5961
this.numOfWorkers = other.numOfWorkers;
6062
this.headersCodec = other.headersCodec;
63+
this.dataCodecs = other.dataCodecs;
6164
this.eventLoopGroup = other.eventLoopGroup;
6265
this.clientLoopResources = other.clientLoopResources;
6366
this.serverLoopResources = other.serverLoopResources;
@@ -87,6 +90,18 @@ public RSocketServiceTransport headersCodec(HeadersCodec headersCodec) {
8790
return rst;
8891
}
8992

93+
/**
94+
* Sets a set of {@code DataCodec}.
95+
*
96+
* @param dataCodecs set of data codecs
97+
* @return new {@code RSocketServiceTransport} instance
98+
*/
99+
public RSocketServiceTransport dataCodec(Collection<DataCodec> dataCodecs) {
100+
RSocketServiceTransport rst = new RSocketServiceTransport(this);
101+
rst.dataCodecs = dataCodecs;
102+
return rst;
103+
}
104+
90105
/**
91106
* Sets a provider function for custom {@code TcpServer}.
92107
*
@@ -119,7 +134,8 @@ public RSocketServiceTransport tcpClient(Function<LoopResources, TcpClient> fact
119134
@Override
120135
public ClientTransport clientTransport() {
121136
return new RSocketClientTransport(
122-
new ServiceMessageCodec(headersCodec), tcpClientProvider.apply(clientLoopResources));
137+
new ServiceMessageCodec(headersCodec, dataCodecs),
138+
tcpClientProvider.apply(clientLoopResources));
123139
}
124140

125141
/**
@@ -130,7 +146,8 @@ public ClientTransport clientTransport() {
130146
@Override
131147
public ServerTransport serverTransport() {
132148
return new RSocketServerTransport(
133-
new ServiceMessageCodec(headersCodec), tcpServerProvider.apply(serverLoopResources));
149+
new ServiceMessageCodec(headersCodec, dataCodecs),
150+
tcpServerProvider.apply(serverLoopResources));
134151
}
135152

136153
@Override

services/src/main/java/io/scalecube/services/Microservices.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ public final class Microservices {
129129
private final ServiceDiscoveryBootstrap discoveryBootstrap;
130130
private final ServiceProviderErrorMapper errorMapper;
131131
private final ServiceMessageDataDecoder dataDecoder;
132+
private final String dataEncoderContentType;
132133
private final MonoProcessor<Void> shutdown = MonoProcessor.create();
133134
private final MonoProcessor<Void> onShutdown = MonoProcessor.create();
134135

@@ -144,6 +145,7 @@ private Microservices(Builder builder) {
144145
this.transportBootstrap = builder.transportBootstrap;
145146
this.errorMapper = builder.errorMapper;
146147
this.dataDecoder = builder.dataDecoder;
148+
this.dataEncoderContentType = builder.dataEncoderContentType;
147149

148150
// Setup cleanup
149151
shutdown
@@ -255,6 +257,7 @@ public ServiceCall call() {
255257
.transport(transportBootstrap.clientTransport)
256258
.serviceRegistry(serviceRegistry)
257259
.methodRegistry(methodRegistry)
260+
.contentType(dataEncoderContentType)
258261
.router(Routers.getRouter(RoundRobinServiceRouter.class));
259262
}
260263

@@ -330,6 +333,7 @@ public static final class Builder {
330333
private ServiceMessageDataDecoder dataDecoder =
331334
Optional.ofNullable(ServiceMessageDataDecoder.INSTANCE)
332335
.orElse((message, dataType) -> message);
336+
private String dataEncoderContentType;
333337

334338
public Mono<Microservices> start() {
335339
return Mono.defer(() -> new Microservices(this).start());
@@ -417,6 +421,15 @@ public Builder defaultDataDecoder(ServiceMessageDataDecoder dataDecoder) {
417421
this.dataDecoder = dataDecoder;
418422
return this;
419423
}
424+
425+
public Builder defaultDataEncoder(DataCodec dataEncoder) {
426+
return defaultDataEncoderContentType(dataEncoder.contentType());
427+
}
428+
429+
public Builder defaultDataEncoderContentType(String contentType) {
430+
this.dataEncoderContentType = contentType;
431+
return this;
432+
}
420433
}
421434

422435
public static class ServiceDiscoveryBootstrap {

0 commit comments

Comments
 (0)