Skip to content

Commit 062a8a6

Browse files
committed
WIP on tests for BeforeDestroy and AfterConstruct
1 parent 1be2d3a commit 062a8a6

File tree

14 files changed

+186
-125
lines changed

14 files changed

+186
-125
lines changed

services-api/src/main/java/io/scalecube/services/Reflect.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.function.Function;
2828
import java.util.stream.Collectors;
2929
import org.reactivestreams.Publisher;
30+
import reactor.core.Exceptions;
3031
import reactor.core.publisher.Flux;
3132
import reactor.core.publisher.Mono;
3233

@@ -133,6 +134,7 @@ public static boolean isRequestTypeServiceMessage(Method method) {
133134
* @param object to inspect
134135
* @return the parameterized Type of a given object or Object class if unknown.
135136
*/
137+
@SuppressWarnings("unused")
136138
public static Type parameterizedType(Object object) {
137139
if (object != null) {
138140
Type type = object.getClass().getGenericSuperclass();
@@ -269,7 +271,7 @@ public static String methodName(Method method) {
269271
*
270272
* @param serviceInterface service interface to get qualifier for
271273
* @param method service's method to get qualifier for
272-
* @return
274+
* @return qualifier string
273275
*/
274276
public static String qualifier(Class<?> serviceInterface, Method method) {
275277
return Qualifier.asString(Reflect.serviceName(serviceInterface), Reflect.methodName(method));
@@ -387,10 +389,13 @@ private static boolean isRequestChannel(Method method) {
387389
|| Publisher.class.isAssignableFrom(reqTypes[0]));
388390
}
389391

390-
public static void setField(Field field, Object object, Object value)
391-
throws IllegalAccessException {
392+
public static void setField(Field field, Object object, Object value) {
392393
field.setAccessible(true);
393-
field.set(object, value);
394+
try {
395+
field.set(object, value);
396+
} catch (IllegalAccessException e) {
397+
throw Exceptions.propagate(e);
398+
}
394399
}
395400

396401
public static boolean isService(Class<?> type) {

services-api/src/main/java/io/scalecube/services/methods/ServiceMethodInvoker.java

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import java.lang.reflect.Method;
1212
import java.lang.reflect.Parameter;
1313
import java.util.Optional;
14-
import java.util.StringJoiner;
1514
import java.util.function.Consumer;
1615
import java.util.stream.Collectors;
1716
import java.util.stream.Stream;
@@ -43,7 +42,7 @@ public final class ServiceMethodInvoker {
4342
* @param errorMapper error mapper
4443
* @param dataDecoder data decoder
4544
*/
46-
@SuppressWarnings("unchecked")
45+
@SuppressWarnings({"unchecked", "rawtypes"})
4746
public ServiceMethodInvoker(
4847
Method method,
4948
Object service,
@@ -202,24 +201,12 @@ private void applyRequestReleaser(ServiceMessage request, Consumer<Object> reque
202201
}
203202
}
204203

205-
/**
206-
* Shortened version of {@code toString} method.
207-
*
208-
* @return service method invoker as string
209-
*/
210-
public String asString() {
211-
return new StringJoiner(", ", ServiceMethodInvoker.class.getSimpleName() + "[", "]")
212-
.add("methodInfo=" + methodInfo.asString())
213-
.add(
214-
"serviceMethod='"
215-
+ service.getClass().getCanonicalName()
216-
+ "."
217-
+ method.getName()
218-
+ "("
219-
+ methodInfo.parameterCount()
220-
+ ")"
221-
+ "'")
222-
.toString();
204+
public Object service() {
205+
return service;
206+
}
207+
208+
public MethodInfo methodInfo() {
209+
return methodInfo;
223210
}
224211

225212
@Override
@@ -232,8 +219,4 @@ public String toString() {
232219
.collect(Collectors.joining(", ", "(", ")"));
233220
return classAndMethod + args;
234221
}
235-
236-
public Object getService() {
237-
return service;
238-
}
239222
}

services-api/src/main/java/io/scalecube/services/methods/ServiceMethodRegistry.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
public interface ServiceMethodRegistry {
99

10+
@SuppressWarnings("rawtypes")
1011
void registerService(
1112
Object serviceInstance,
1213
ServiceProviderErrorMapper errorMapper,

services/src/main/java/io/scalecube/services/Injector.java

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,18 @@
22

33
import io.scalecube.services.annotations.AfterConstruct;
44
import io.scalecube.services.annotations.BeforeDestroy;
5+
import io.scalecube.services.annotations.Inject;
56
import io.scalecube.services.routing.Router;
67
import java.lang.annotation.Annotation;
78
import java.lang.reflect.Field;
89
import java.lang.reflect.Method;
910
import java.util.Arrays;
1011
import java.util.Collection;
11-
import org.slf4j.Logger;
12-
import org.slf4j.LoggerFactory;
12+
import reactor.core.Exceptions;
1313

1414
/** Service Injector scan and injects beans to a given Microservices instance. */
1515
final class Injector {
1616

17-
private static final Logger LOGGER = LoggerFactory.getLogger(Injector.class);
18-
1917
private Injector() {
2018
// Do not instantiate
2119
}
@@ -38,31 +36,17 @@ public static Microservices inject(Microservices microservices, Collection<Objec
3836
}
3937

4038
private static void injectField(Microservices microservices, Field field, Object service) {
41-
try {
42-
if (field.isAnnotationPresent(io.scalecube.services.annotations.Inject.class)
43-
&& field.getType().equals(Microservices.class)) {
44-
Reflect.setField(field, service, microservices);
45-
} else if (field.isAnnotationPresent(io.scalecube.services.annotations.Inject.class)
46-
&& Reflect.isService(field.getType())) {
47-
io.scalecube.services.annotations.Inject injection =
48-
field.getAnnotation(io.scalecube.services.annotations.Inject.class);
49-
Class<? extends Router> routerClass = injection.router();
50-
51-
final ServiceCall call = microservices.call();
52-
53-
if (!routerClass.isInterface()) {
54-
call.router(routerClass);
55-
}
56-
57-
final Object targetProxy = call.api(field.getType());
58-
59-
Reflect.setField(field, service, targetProxy);
39+
if (field.isAnnotationPresent(Inject.class) && field.getType().equals(Microservices.class)) {
40+
Reflect.setField(field, service, microservices);
41+
} else if (field.isAnnotationPresent(Inject.class) && Reflect.isService(field.getType())) {
42+
Inject injection = field.getAnnotation(Inject.class);
43+
Class<? extends Router> routerClass = injection.router();
44+
final ServiceCall call = microservices.call();
45+
if (!routerClass.isInterface()) {
46+
call.router(routerClass);
6047
}
61-
} catch (Exception ex) {
62-
LOGGER.error(
63-
"failed to set service proxy of type: {} reason:{}",
64-
service.getClass().getName(),
65-
ex.getMessage());
48+
final Object targetProxy = call.api(field.getType());
49+
Reflect.setField(field, service, targetProxy);
6650
}
6751
}
6852

@@ -98,7 +82,7 @@ private static <A extends Annotation> void processMethodWithAnnotation(
9882
.toArray();
9983
targetMethod.invoke(targetInstance, parameters);
10084
} catch (Exception ex) {
101-
throw new RuntimeException(ex);
85+
throw Exceptions.propagate(ex);
10286
}
10387
});
10488
}

services/src/main/java/io/scalecube/services/Microservices.java

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.List;
2929
import java.util.Map;
3030
import java.util.Optional;
31+
import java.util.StringJoiner;
3132
import java.util.UUID;
3233
import java.util.concurrent.CopyOnWriteArrayList;
3334
import java.util.function.Function;
@@ -274,11 +275,7 @@ public ServiceDiscovery discovery() {
274275
* @return result of shutdown
275276
*/
276277
public Mono<Void> shutdown() {
277-
return Mono.defer(
278-
() -> {
279-
shutdown.onComplete();
280-
return onShutdown;
281-
});
278+
return Mono.fromRunnable(shutdown::onComplete).then(onShutdown);
282279
}
283280

284281
/**
@@ -310,14 +307,11 @@ private Mono<Void> doShutdown() {
310307
}
311308

312309
private Mono<Void> processBeforeDestroy() {
313-
return Mono.fromSupplier(
314-
() -> {
315-
methodRegistry.listInvokers().stream()
316-
.map(ServiceMethodInvoker::getService)
317-
.distinct()
318-
.forEach(service -> Injector.processBeforeDestroy(this, service));
319-
return null;
320-
});
310+
return Mono.whenDelayError(
311+
methodRegistry.listInvokers().stream()
312+
.map(ServiceMethodInvoker::service)
313+
.map(s -> Mono.fromRunnable(() -> Injector.processBeforeDestroy(this, s)))
314+
.collect(Collectors.toList()));
321315
}
322316

323317
public static final class Builder {
@@ -631,8 +625,7 @@ private Mono<ServiceTransportBootstrap> start(ServiceMethodRegistry methodRegist
631625

632626
// create client transport
633627
this.clientTransport = serviceTransport.clientTransport();
634-
LOGGER.debug(
635-
"Successfully created ClientTransport -- {}", this.clientTransport);
628+
LOGGER.debug("Successfully created ClientTransport -- {}", this.clientTransport);
636629
return this;
637630
});
638631
}
@@ -667,8 +660,7 @@ private static class JmxMonitorMBean implements MonitorMBean {
667660
private static JmxMonitorMBean start(Microservices instance) throws Exception {
668661
MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
669662
JmxMonitorMBean jmxMBean = new JmxMonitorMBean(instance);
670-
ObjectName objectName =
671-
new ObjectName("io.scalecube.services:name=" + instance.toString());
663+
ObjectName objectName = new ObjectName("io.scalecube.services:name=" + instance.toString());
672664
StandardMBean standardMBean = new StandardMBean(jmxMBean, MonitorMBean.class);
673665
mbeanServer.registerMBean(standardMBean, objectName);
674666
return jmxMBean;
@@ -693,8 +685,23 @@ public String getAllServiceEndpoints() {
693685
@Override
694686
public String getServiceMethodInvokers() {
695687
return microservices.methodRegistry.listInvokers().stream()
696-
.map(ServiceMethodInvoker::asString)
688+
.map(JmxMonitorMBean::asString)
697689
.collect(Collectors.joining(",", "[", "]"));
698690
}
691+
692+
private static String asString(ServiceMethodInvoker invoker) {
693+
return new StringJoiner(", ", ServiceMethodInvoker.class.getSimpleName() + "[", "]")
694+
.add("methodInfo=" + invoker.methodInfo().asString())
695+
.add(
696+
"serviceMethod='"
697+
+ invoker.service().getClass().getCanonicalName()
698+
+ "."
699+
+ invoker.methodInfo().methodName()
700+
+ "("
701+
+ invoker.methodInfo().parameterCount()
702+
+ ")"
703+
+ "'")
704+
.toString();
705+
}
699706
}
700707
}

services/src/main/java/io/scalecube/services/methods/ServiceMethodRegistryImpl.java

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
import io.scalecube.services.auth.Authenticator;
55
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
66
import io.scalecube.services.transport.api.ServiceMessageDataDecoder;
7+
import java.lang.reflect.Method;
78
import java.util.ArrayList;
89
import java.util.List;
10+
import java.util.Map;
911
import java.util.concurrent.ConcurrentHashMap;
1012
import java.util.concurrent.ConcurrentMap;
1113

@@ -14,6 +16,7 @@ public final class ServiceMethodRegistryImpl implements ServiceMethodRegistry {
1416
private final ConcurrentMap<String, ServiceMethodInvoker> methodInvokers =
1517
new ConcurrentHashMap<>();
1618

19+
@SuppressWarnings("rawtypes")
1720
@Override
1821
public void registerService(
1922
Object serviceInstance,
@@ -22,43 +25,43 @@ public void registerService(
2225
Authenticator authenticator) {
2326
Reflect.serviceInterfaces(serviceInstance)
2427
.forEach(
25-
serviceInterface ->
26-
Reflect.serviceMethods(serviceInterface)
27-
.forEach(
28-
(key, method) -> {
28+
serviceInterface -> {
29+
Map<String, Method> serviceMethods = Reflect.serviceMethods(serviceInterface);
30+
serviceMethods.forEach(
31+
(key, method) -> {
2932

30-
// validate method
31-
Reflect.validateMethodOrThrow(method);
33+
// validate method
34+
Reflect.validateMethodOrThrow(method);
3235

33-
MethodInfo methodInfo =
34-
new MethodInfo(
35-
Reflect.serviceName(serviceInterface),
36-
Reflect.methodName(method),
37-
Reflect.parameterizedReturnType(method),
38-
Reflect.isReturnTypeServiceMessage(method),
39-
Reflect.communicationMode(method),
40-
method.getParameterCount(),
41-
Reflect.requestType(method),
42-
Reflect.isRequestTypeServiceMessage(method),
43-
Reflect.isAuth(method));
36+
MethodInfo methodInfo =
37+
new MethodInfo(
38+
Reflect.serviceName(serviceInterface),
39+
Reflect.methodName(method),
40+
Reflect.parameterizedReturnType(method),
41+
Reflect.isReturnTypeServiceMessage(method),
42+
Reflect.communicationMode(method),
43+
method.getParameterCount(),
44+
Reflect.requestType(method),
45+
Reflect.isRequestTypeServiceMessage(method),
46+
Reflect.isAuth(method));
4447

45-
// register new service method invoker
46-
String qualifier = methodInfo.qualifier();
47-
if (methodInvokers.containsKey(qualifier)) {
48-
throw new IllegalStateException(
49-
String.format(
50-
"MethodInvoker for api '%s' already exists", qualifier));
51-
}
52-
ServiceMethodInvoker invoker =
53-
new ServiceMethodInvoker(
54-
method,
55-
serviceInstance,
56-
methodInfo,
57-
errorMapper,
58-
dataDecoder,
59-
authenticator);
60-
methodInvokers.put(methodInfo.qualifier(), invoker);
61-
}));
48+
// register new service method invoker
49+
String qualifier = methodInfo.qualifier();
50+
if (methodInvokers.containsKey(qualifier)) {
51+
throw new IllegalStateException(
52+
String.format("MethodInvoker for api '%s' already exists", qualifier));
53+
}
54+
ServiceMethodInvoker invoker =
55+
new ServiceMethodInvoker(
56+
method,
57+
serviceInstance,
58+
methodInfo,
59+
errorMapper,
60+
dataDecoder,
61+
authenticator);
62+
methodInvokers.put(methodInfo.qualifier(), invoker);
63+
});
64+
});
6265
}
6366

6467
@Override

services/src/test/java/io/scalecube/services/ErrorFlowTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.reactivestreams.Publisher;
2121
import reactor.test.StepVerifier;
2222

23-
public class ErrorFlowTest {
23+
public class ErrorFlowTest extends BaseTest {
2424

2525
private static AtomicInteger port = new AtomicInteger(4000);
2626
private static Microservices provider;

services/src/test/java/io/scalecube/services/ServiceAuthLocalTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import reactor.core.publisher.Mono;
2121
import reactor.test.StepVerifier;
2222

23-
final class ServiceAuthLocalTest {
23+
final class ServiceAuthLocalTest extends BaseTest {
2424

2525
private static final Duration TIMEOUT = Duration.ofSeconds(3);
2626

0 commit comments

Comments
 (0)