Skip to content

Commit cc83cd9

Browse files
authored
Merge pull request #791 from harry-hao/feature/generic-is-request-type-service-message
Fixed an error caused by Reflect.isRequestTypeServiceMessage() returning false on Publisher<ServiceMessage>
2 parents c5c40d3 + 2bf60a0 commit cc83cd9

File tree

6 files changed

+219
-12
lines changed

6 files changed

+219
-12
lines changed

services-api/src/main/java/io/scalecube/services/Reflect.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,18 @@ public static Class<?> requestType(Method method) {
115115
* @return true if the first parameter of method is ServiceMessage, otherwise false
116116
*/
117117
public static boolean isRequestTypeServiceMessage(Method method) {
118-
Class<?>[] parameterTypes = method.getParameterTypes();
118+
Type[] parameterTypes = method.getGenericParameterTypes();
119119

120-
return parameterTypes.length > 0 && ServiceMessage.class.equals(parameterTypes[0]);
120+
if (parameterTypes.length < 1) {
121+
return false;
122+
}
123+
124+
if (parameterTypes[0] instanceof ParameterizedType) {
125+
ParameterizedType parameterizedType = (ParameterizedType) parameterTypes[0];
126+
return ServiceMessage.class.equals(parameterizedType.getActualTypeArguments()[0]);
127+
}
128+
129+
return ServiceMessage.class.equals(parameterTypes[0]);
121130
}
122131

123132
/**

services-api/src/test/java/io/scalecube/services/methods/ReflectTest.java

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import java.lang.reflect.Method;
1111
import java.util.Arrays;
1212
import java.util.stream.Stream;
13+
14+
import io.scalecube.services.api.ServiceMessage;
1315
import org.junit.jupiter.api.Assertions;
1416
import org.junit.jupiter.params.ParameterizedTest;
1517
import org.junit.jupiter.params.provider.Arguments;
@@ -26,7 +28,7 @@ public class ReflectTest {
2628
* @param expectedMode expected mode
2729
*/
2830
@ParameterizedTest
29-
@MethodSource("argsProvider")
31+
@MethodSource("argsCommunicationModeProvider")
3032
public void testCommunicationMode(String methodName, CommunicationMode expectedMode) {
3133
// Given:
3234
Method m =
@@ -40,12 +42,47 @@ public void testCommunicationMode(String methodName, CommunicationMode expectedM
4042
Assertions.assertEquals(expectedMode, communicationMode, "Invalid communicationMode");
4143
}
4244

43-
static Stream<Arguments> argsProvider() {
45+
static Stream<Arguments> argsCommunicationModeProvider() {
4446
return Stream.of(
4547
Arguments.of("fireAndForget", FIRE_AND_FORGET),
48+
Arguments.of("emptyResponse", REQUEST_RESPONSE),
4649
Arguments.of("requestResponse", REQUEST_RESPONSE),
4750
Arguments.of("requestStream", REQUEST_STREAM),
48-
Arguments.of("requestChannel", REQUEST_CHANNEL));
51+
Arguments.of("requestChannel", REQUEST_CHANNEL),
52+
Arguments.of("fireAndForgetMessage", FIRE_AND_FORGET),
53+
Arguments.of("emptyResponseMessage", REQUEST_RESPONSE),
54+
Arguments.of("requestResponseMessage", REQUEST_RESPONSE),
55+
Arguments.of("requestStreamMessage", REQUEST_STREAM),
56+
Arguments.of("requestChannelMessage", REQUEST_CHANNEL));
57+
}
58+
59+
@ParameterizedTest
60+
@MethodSource("argsIsRequestTypeServiceMessage")
61+
public void testIsRequestTypeServiceMessage(String methodName, boolean expect) {
62+
// Given:
63+
Method method = Arrays.stream(ReflectTest.TestService.class.getMethods())
64+
.filter(meth -> meth.getName().equals(methodName))
65+
.findFirst()
66+
.get();
67+
// When:
68+
boolean actual = Reflect.isRequestTypeServiceMessage(method);
69+
// Then:
70+
Assertions.assertEquals(expect, actual,
71+
String.format("isRequestTypeServiceMessage(%s) should be %b", methodName, expect));
72+
}
73+
74+
static Stream<Arguments> argsIsRequestTypeServiceMessage() {
75+
return Stream.of(
76+
Arguments.of("fireAndForget", false),
77+
Arguments.of("emptyResponse", false),
78+
Arguments.of("requestResponse", false),
79+
Arguments.of("requestStream", false),
80+
Arguments.of("requestChannel", false),
81+
Arguments.of("fireAndForgetMessage", true),
82+
Arguments.of("emptyResponseMessage", true),
83+
Arguments.of("requestResponseMessage", true),
84+
Arguments.of("requestStreamMessage", true),
85+
Arguments.of("requestChannelMessage", true));
4986
}
5087

5188
private interface TestService {
@@ -58,5 +95,16 @@ private interface TestService {
5895
Flux<Integer> requestStream(Integer i);
5996

6097
Flux<Integer> requestChannel(Flux<Integer> i);
98+
99+
void fireAndForgetMessage(ServiceMessage sm);
100+
101+
Mono<Void> emptyResponseMessage(ServiceMessage sm);
102+
103+
Mono<ServiceMessage> requestResponseMessage(ServiceMessage sm);
104+
105+
Flux<ServiceMessage> requestStreamMessage(ServiceMessage sm);
106+
107+
Flux<ServiceMessage> requestChannelMessage(Flux<ServiceMessage> sm);
108+
61109
}
62110
}

services/src/test/java/io/scalecube/services/ServiceLocalTest.java

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@
1414
import org.junit.jupiter.api.AfterEach;
1515
import org.junit.jupiter.api.BeforeEach;
1616
import org.junit.jupiter.api.Test;
17-
import reactor.core.publisher.EmitterProcessor;
18-
import reactor.core.publisher.Flux;
19-
import reactor.core.publisher.Mono;
17+
import reactor.core.publisher.*;
2018
import reactor.test.StepVerifier;
2119

2220
public class ServiceLocalTest extends BaseTest {
@@ -260,6 +258,23 @@ public void test_local_bidi_greeting_expect_IllegalArgumentException() {
260258
.verify(Duration.ofSeconds(3));
261259
}
262260

261+
@Test
262+
public void test_local_bidi_greeting_message_expect_IllegalArgumentException() {
263+
// get a proxy to the service api.
264+
GreetingService service = createProxy(microservices);
265+
266+
// call the service. bidiThrowingGreeting
267+
Flux<GreetingResponse> responses =
268+
service.bidiGreetingIllegalArgumentExceptionMessage(
269+
Mono.just(ServiceMessage.builder()
270+
.data(new GreetingRequest("IllegalArgumentException")).build()))
271+
.map(ServiceMessage::data);
272+
// call the service.
273+
StepVerifier.create(responses)
274+
.expectErrorMessage("IllegalArgumentException")
275+
.verify(Duration.ofSeconds(3));
276+
}
277+
263278
@Test
264279
public void test_local_bidi_greeting_expect_NotAuthorized() {
265280
// get a proxy to the service api.
@@ -279,6 +294,24 @@ public void test_local_bidi_greeting_expect_NotAuthorized() {
279294
.verify(Duration.ofSeconds(3));
280295
}
281296

297+
@Test
298+
public void test_local_bidi_greeting_message_expect_NotAuthorized() {
299+
// get a proxy to the service api.
300+
GreetingService service = createProxy(microservices);
301+
302+
DirectProcessor<GreetingRequest> requests = DirectProcessor.create();
303+
304+
// call the service.
305+
Flux<GreetingResponse> responses = service.bidiGreetingNotAuthorizedMessage(
306+
requests.map(request -> ServiceMessage.builder().data(request).build()))
307+
.map(ServiceMessage::data);
308+
309+
StepVerifier.create(responses)
310+
.then(() -> requests.onNext(new GreetingRequest("joe-1")))
311+
.expectErrorMessage("Not authorized")
312+
.verify(Duration.ofSeconds(3));
313+
}
314+
282315
@Test
283316
public void test_local_bidi_greeting_expect_GreetingResponse() {
284317
// get a proxy to the service api.
@@ -304,6 +337,29 @@ public void test_local_bidi_greeting_expect_GreetingResponse() {
304337
.verify(Duration.ofSeconds(3));
305338
}
306339

340+
@Test
341+
public void test_local_bidi_greeting_expect_message_GreetingResponse() {
342+
// get a proxy to the service api.
343+
GreetingService service = createProxy(microservices);
344+
345+
UnicastProcessor<GreetingRequest> requests = UnicastProcessor.create();
346+
// call the service.
347+
Flux<GreetingResponse> responses = service.bidiGreetingMessage(requests
348+
.map(request -> ServiceMessage.builder().data(request).build()))
349+
.map(ServiceMessage::data);
350+
351+
StepVerifier.create(responses)
352+
.then(() -> requests.onNext(new GreetingRequest("joe-1")))
353+
.expectNextMatches(resp -> resp.getResult().equals(" hello to: joe-1"))
354+
.then(() -> requests.onNext(new GreetingRequest("joe-2")))
355+
.expectNextMatches(resp -> resp.getResult().equals(" hello to: joe-2"))
356+
.then(() -> requests.onNext(new GreetingRequest("joe-3")))
357+
.expectNextMatches(resp -> resp.getResult().equals(" hello to: joe-3"))
358+
.then(() -> requests.onComplete())
359+
.expectComplete()
360+
.verify(Duration.ofSeconds(3));
361+
}
362+
307363
private GreetingService createProxy(Microservices gateway) {
308364
return gateway.call().api(GreetingService.class); // create proxy for GreetingService API
309365
}

services/src/test/java/io/scalecube/services/ServiceRemoteTest.java

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,7 @@
2727
import org.junit.jupiter.api.Disabled;
2828
import org.junit.jupiter.api.Test;
2929
import org.reactivestreams.Publisher;
30-
import reactor.core.publisher.EmitterProcessor;
31-
import reactor.core.publisher.Flux;
32-
import reactor.core.publisher.Hooks;
33-
import reactor.core.publisher.Mono;
30+
import reactor.core.publisher.*;
3431
import reactor.test.StepVerifier;
3532

3633
public class ServiceRemoteTest extends BaseTest {
@@ -369,6 +366,25 @@ public void test_remote_bidi_greeting_expect_IllegalArgumentException() {
369366
.verify(Duration.ofSeconds(3));
370367
}
371368

369+
@Test
370+
public void test_remote_bidi_greeting_message_expect_IllegalArgumentException() {
371+
372+
// get a proxy to the service api.
373+
GreetingService service = createProxy();
374+
375+
// call the service. bidiThrowingGreeting
376+
Flux<GreetingResponse> responses =
377+
service.bidiGreetingIllegalArgumentExceptionMessage(
378+
Mono.just(ServiceMessage.builder()
379+
.data(new GreetingRequest("IllegalArgumentException")).build()))
380+
.map(ServiceMessage::data);
381+
382+
// call the service.
383+
StepVerifier.create(responses)
384+
.expectErrorMessage("IllegalArgumentException")
385+
.verify(Duration.ofSeconds(3));
386+
}
387+
372388
@Test
373389
public void test_remote_bidi_greeting_expect_NotAuthorized() {
374390

@@ -389,6 +405,25 @@ public void test_remote_bidi_greeting_expect_NotAuthorized() {
389405
.verify(Duration.ofSeconds(3));
390406
}
391407

408+
@Test
409+
public void test_remote_bidi_greeting_message_expect_NotAuthorized() {
410+
411+
// get a proxy to the service api.
412+
GreetingService service = createProxy();
413+
414+
DirectProcessor<GreetingRequest> requests = DirectProcessor.create();
415+
416+
// call the service.
417+
Flux<GreetingResponse> responses = service.bidiGreetingNotAuthorizedMessage(
418+
requests.map(request -> ServiceMessage.builder().data(request).build()))
419+
.map(ServiceMessage::data);
420+
421+
StepVerifier.create(responses)
422+
.then(() -> requests.onNext(new GreetingRequest("joe-1")))
423+
.expectErrorMessage("Not authorized")
424+
.verify(Duration.ofSeconds(3));
425+
}
426+
392427
@Test
393428
public void test_remote_bidi_greeting_expect_GreetingResponse() {
394429

@@ -415,6 +450,31 @@ public void test_remote_bidi_greeting_expect_GreetingResponse() {
415450
.verify(Duration.ofSeconds(3));
416451
}
417452

453+
@Test
454+
public void test_remote_bidi_greeting_message_expect_GreetingResponse() {
455+
456+
// get a proxy to the service api.
457+
GreetingService service = createProxy();
458+
459+
UnicastProcessor<GreetingRequest> requests = UnicastProcessor.create();
460+
461+
// call the service.
462+
Flux<GreetingResponse> responses = service.bidiGreetingMessage(requests
463+
.map(request -> ServiceMessage.builder().data(request).build()))
464+
.map(ServiceMessage::data);
465+
466+
StepVerifier.create(responses)
467+
.then(() -> requests.onNext(new GreetingRequest("joe-1")))
468+
.expectNextMatches(resp -> resp.getResult().equals(" hello to: joe-1"))
469+
.then(() -> requests.onNext(new GreetingRequest("joe-2")))
470+
.expectNextMatches(resp -> resp.getResult().equals(" hello to: joe-2"))
471+
.then(() -> requests.onNext(new GreetingRequest("joe-3")))
472+
.expectNextMatches(resp -> resp.getResult().equals(" hello to: joe-3"))
473+
.then(() -> requests.onComplete())
474+
.expectComplete()
475+
.verify(Duration.ofSeconds(3));
476+
}
477+
418478
@Test
419479
public void test_services_contribute_to_cluster_metadata() {
420480
Map<String, String> tags = new HashMap<>();

services/src/test/java/io/scalecube/services/sut/GreetingService.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,27 @@ public interface GreetingService {
7070
@ServiceMethod
7171
Flux<GreetingResponse> bidiGreeting(Publisher<GreetingRequest> request);
7272

73+
@ServiceMethod
74+
@RequestType(GreetingRequest.class)
75+
@ResponseType(GreetingResponse.class)
76+
Flux<ServiceMessage> bidiGreetingMessage(Publisher<ServiceMessage> requests);
77+
7378
@ServiceMethod
7479
Flux<GreetingResponse> bidiGreetingNotAuthorized(Flux<GreetingRequest> request);
7580

81+
@ServiceMethod
82+
@RequestType(GreetingRequest.class)
83+
@ResponseType(GreetingResponse.class)
84+
Flux<ServiceMessage> bidiGreetingNotAuthorizedMessage(Publisher<ServiceMessage> requests);
85+
7686
@ServiceMethod
7787
Flux<GreetingResponse> bidiGreetingIllegalArgumentException(Publisher<GreetingRequest> request);
7888

89+
@ServiceMethod
90+
@RequestType(GreetingRequest.class)
91+
@ResponseType(GreetingResponse.class)
92+
Flux<ServiceMessage> bidiGreetingIllegalArgumentExceptionMessage(Publisher<ServiceMessage> requests);
93+
7994
@ServiceMethod
8095
Mono<GreetingResponse> greetingMonoEmpty(GreetingRequest request);
8196

services/src/test/java/io/scalecube/services/sut/GreetingServiceImpl.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,17 +78,36 @@ public Flux<GreetingResponse> bidiGreeting(Publisher<GreetingRequest> request) {
7878
.map(onNext -> new GreetingResponse(" hello to: " + onNext.getName(), "" + instanceId));
7979
}
8080

81+
@Override
82+
public Flux<ServiceMessage> bidiGreetingMessage(Publisher<ServiceMessage> requests) {
83+
return Flux.from(requests).map(request -> {
84+
GreetingRequest data = request.data();
85+
GreetingResponse resp = new GreetingResponse(" hello to: " + data.getName(), "1");
86+
return ServiceMessage.builder().data(resp).build();
87+
});
88+
}
89+
8190
@Override
8291
public Flux<GreetingResponse> bidiGreetingNotAuthorized(Flux<GreetingRequest> request) {
8392
return Flux.error(new ForbiddenException("Not authorized"));
8493
}
8594

95+
@Override
96+
public Flux<ServiceMessage> bidiGreetingNotAuthorizedMessage(Publisher<ServiceMessage> requests) {
97+
return Flux.error(new ForbiddenException("Not authorized"));
98+
}
99+
86100
@Override
87101
public Flux<GreetingResponse> bidiGreetingIllegalArgumentException(
88102
Publisher<GreetingRequest> request) {
89103
throw new IllegalArgumentException("IllegalArgumentException");
90104
}
91105

106+
@Override
107+
public Flux<ServiceMessage> bidiGreetingIllegalArgumentExceptionMessage(Publisher<ServiceMessage> requests) {
108+
throw new IllegalArgumentException("IllegalArgumentException");
109+
}
110+
92111
@Override
93112
public Mono<GreetingResponse> greetingMonoEmpty(GreetingRequest request) {
94113
return Mono.empty();

0 commit comments

Comments
 (0)