Skip to content

Commit c4ce6db

Browse files
authored
Support of file transfer (service to client) (#874)
1 parent 2bb0398 commit c4ce6db

File tree

36 files changed

+1537
-236
lines changed

36 files changed

+1537
-236
lines changed

services-api/src/main/java/io/scalecube/services/Reflect.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import io.scalecube.services.annotations.Service;
1313
import io.scalecube.services.annotations.ServiceMethod;
1414
import io.scalecube.services.annotations.Tag;
15-
import io.scalecube.services.api.Qualifier;
1615
import io.scalecube.services.api.ServiceMessage;
1716
import io.scalecube.services.auth.Secured;
1817
import io.scalecube.services.methods.MethodInfo;
@@ -284,17 +283,6 @@ public static String restMethod(Method method) {
284283
return methodAnnotation != null ? methodAnnotation.value() : null;
285284
}
286285

287-
/**
288-
* Handy method to get qualifier String from service's interface and method.
289-
*
290-
* @param serviceInterface service interface to get qualifier for
291-
* @param method service's method to get qualifier for
292-
* @return qualifier string
293-
*/
294-
public static String qualifier(Class<?> serviceInterface, Method method) {
295-
return Qualifier.asString(Reflect.serviceName(serviceInterface), Reflect.methodName(method));
296-
}
297-
298286
/**
299287
* Util function to perform basic validation of service message request.
300288
*
@@ -392,7 +380,11 @@ public static boolean isSecured(Method method) {
392380
}
393381

394382
public static Scheduler executeOnScheduler(Method method, Map<String, Scheduler> schedulers) {
395-
final Class<?> declaringClass = method.getDeclaringClass();
383+
if (schedulers == null) {
384+
return Schedulers.immediate();
385+
}
386+
387+
final var declaringClass = method.getDeclaringClass();
396388

397389
if (method.isAnnotationPresent(ExecuteOn.class)) {
398390
final var executeOn = method.getAnnotation(ExecuteOn.class);

services-api/src/main/java/io/scalecube/services/ServiceCall.java

Lines changed: 45 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
import io.scalecube.services.transport.api.ClientTransport;
1414
import java.lang.System.Logger;
1515
import java.lang.System.Logger.Level;
16-
import java.lang.reflect.InvocationHandler;
17-
import java.lang.reflect.Method;
1816
import java.lang.reflect.Proxy;
1917
import java.lang.reflect.Type;
2018
import java.util.Collections;
@@ -192,7 +190,7 @@ public Mono<ServiceMessage> requestOne(ServiceMessage request, Type responseType
192190
() -> {
193191
ServiceMethodInvoker methodInvoker;
194192
if (serviceRegistry != null
195-
&& (methodInvoker = serviceRegistry.getInvoker(request)) != null) {
193+
&& (methodInvoker = serviceRegistry.lookupInvoker(request)) != null) {
196194
// local service
197195
return methodInvoker.invokeOne(request).map(this::throwIfError);
198196
} else {
@@ -246,7 +244,7 @@ public Flux<ServiceMessage> requestMany(ServiceMessage request, Type responseTyp
246244
() -> {
247245
ServiceMethodInvoker methodInvoker;
248246
if (serviceRegistry != null
249-
&& (methodInvoker = serviceRegistry.getInvoker(request)) != null) {
247+
&& (methodInvoker = serviceRegistry.lookupInvoker(request)) != null) {
250248
// local service
251249
return methodInvoker.invokeMany(request).map(this::throwIfError);
252250
} else {
@@ -301,7 +299,7 @@ public Flux<ServiceMessage> requestBidirectional(
301299
ServiceMessage request = first.get();
302300
ServiceMethodInvoker methodInvoker;
303301
if (serviceRegistry != null
304-
&& (methodInvoker = serviceRegistry.getInvoker(request)) != null) {
302+
&& (methodInvoker = serviceRegistry.lookupInvoker(request)) != null) {
305303
// local service
306304
return methodInvoker.invokeBidirectional(messages).map(this::throwIfError);
307305
} else {
@@ -328,59 +326,53 @@ public Flux<ServiceMessage> requestBidirectional(
328326
*/
329327
@SuppressWarnings("unchecked")
330328
public <T> T api(Class<T> serviceInterface) {
331-
332-
final ServiceCall serviceCall = this;
333-
final Map<Method, MethodInfo> genericReturnTypes = Reflect.methodsInfo(serviceInterface);
334-
335-
// noinspection unchecked,Convert2Lambda
336329
return (T)
337330
Proxy.newProxyInstance(
338331
getClass().getClassLoader(),
339332
new Class[] {serviceInterface},
340-
new InvocationHandler() {
341-
@Override
342-
public Object invoke(Object proxy, Method method, Object[] params) {
343-
Optional<Object> check =
344-
toStringOrEqualsOrHashCode(method.getName(), serviceInterface, params);
345-
if (check.isPresent()) {
346-
return check.get(); // toString, hashCode was invoked.
347-
}
333+
(proxy, method, params) -> {
334+
Optional<Object> check =
335+
toStringOrEqualsOrHashCode(method.getName(), serviceInterface, params);
336+
if (check.isPresent()) {
337+
return check.get(); // toString, hashCode was invoked.
338+
}
348339

349-
final MethodInfo methodInfo = genericReturnTypes.get(method);
350-
final Type returnType = methodInfo.parameterizedReturnType();
351-
final boolean isServiceMessage = methodInfo.isReturnTypeServiceMessage();
352-
353-
Object request = methodInfo.requestType() == Void.TYPE ? null : params[0];
354-
355-
switch (methodInfo.communicationMode()) {
356-
case FIRE_AND_FORGET:
357-
return serviceCall.oneWay(toServiceMessage(methodInfo, request));
358-
359-
case REQUEST_RESPONSE:
360-
return serviceCall
361-
.requestOne(toServiceMessage(methodInfo, request), returnType)
362-
.transform(asMono(isServiceMessage));
363-
364-
case REQUEST_STREAM:
365-
return serviceCall
366-
.requestMany(toServiceMessage(methodInfo, request), returnType)
367-
.transform(asFlux(isServiceMessage));
368-
369-
case REQUEST_CHANNEL:
370-
// this is REQUEST_CHANNEL so it means params[0] must
371-
// be a publisher - its safe to cast.
372-
//noinspection rawtypes
373-
return serviceCall
374-
.requestBidirectional(
375-
Flux.from((Publisher) request)
376-
.map(data -> toServiceMessage(methodInfo, data)),
377-
returnType)
378-
.transform(asFlux(isServiceMessage));
379-
380-
default:
381-
throw new IllegalArgumentException(
382-
"Communication mode is not supported: " + method);
383-
}
340+
final var serviceCall = ServiceCall.this;
341+
final var genericReturnTypes = Reflect.methodsInfo(serviceInterface);
342+
final var methodInfo = genericReturnTypes.get(method);
343+
final var returnType = methodInfo.parameterizedReturnType();
344+
final var isServiceMessage = methodInfo.isReturnTypeServiceMessage();
345+
final var request = methodInfo.requestType() == Void.TYPE ? null : params[0];
346+
347+
//noinspection EnhancedSwitchMigration
348+
switch (methodInfo.communicationMode()) {
349+
case FIRE_AND_FORGET:
350+
return serviceCall.oneWay(toServiceMessage(methodInfo, request));
351+
352+
case REQUEST_RESPONSE:
353+
return serviceCall
354+
.requestOne(toServiceMessage(methodInfo, request), returnType)
355+
.transform(asMono(isServiceMessage));
356+
357+
case REQUEST_STREAM:
358+
return serviceCall
359+
.requestMany(toServiceMessage(methodInfo, request), returnType)
360+
.transform(asFlux(isServiceMessage));
361+
362+
case REQUEST_CHANNEL:
363+
// this is REQUEST_CHANNEL so it means params[0] must
364+
// be a publisher - its safe to cast.
365+
//noinspection rawtypes
366+
return serviceCall
367+
.requestBidirectional(
368+
Flux.from((Publisher) request)
369+
.map(data -> toServiceMessage(methodInfo, data)),
370+
returnType)
371+
.transform(asFlux(isServiceMessage));
372+
373+
default:
374+
throw new IllegalArgumentException(
375+
"Communication mode is not supported: " + method);
384376
}
385377
});
386378
}

services-api/src/main/java/io/scalecube/services/ServiceReference.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.scalecube.services;
22

3+
import static io.scalecube.services.api.DynamicQualifier.isDynamicQualifier;
4+
35
import io.scalecube.services.api.DynamicQualifier;
46
import io.scalecube.services.api.Qualifier;
57
import java.util.Collections;
@@ -40,7 +42,7 @@ public ServiceReference(
4042
this.namespace = serviceRegistration.namespace();
4143
this.action = serviceMethodDefinition.action();
4244
this.qualifier = Qualifier.asString(namespace, action);
43-
this.dynamicQualifier = qualifier.contains(":") ? new DynamicQualifier(qualifier) : null;
45+
this.dynamicQualifier = isDynamicQualifier(qualifier) ? DynamicQualifier.from(qualifier) : null;
4446
this.contentTypes = Collections.unmodifiableSet(serviceEndpoint.contentTypes());
4547
this.tags = mergeTags(serviceMethodDefinition, serviceRegistration, serviceEndpoint);
4648
this.address = serviceEndpoint.address();

services-api/src/main/java/io/scalecube/services/ServiceScanner.java

Lines changed: 0 additions & 59 deletions
This file was deleted.

services-api/src/main/java/io/scalecube/services/api/DynamicQualifier.java

Lines changed: 74 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,69 +9,125 @@
99
import java.util.StringJoiner;
1010
import java.util.regex.Pattern;
1111

12+
/**
13+
* Representation of dynamic qualifier. Being used in service method definitions along with normal
14+
* qualifiers. Typical example of dynamic qualifiers:
15+
*
16+
* <ul>
17+
* <li>v1/api/users/:userId
18+
* <li>v1/api/orders/:orderId/
19+
* <li>v1/api/categories/:categoryId/products/:productId
20+
* </ul>
21+
*/
1222
public final class DynamicQualifier {
1323

24+
private static final Pattern DYNAMIC_QUALIFIER_PATTERN = Pattern.compile("(^|/):\\w+(?:/|$)");
25+
1426
private final String qualifier;
1527
private final Pattern pattern;
1628
private final List<String> pathVariables;
1729
private final int size;
1830

19-
public DynamicQualifier(String qualifier) {
20-
if (!qualifier.contains(":")) {
21-
throw new IllegalArgumentException("Illegal dynamic qualifier: " + qualifier);
22-
}
31+
private DynamicQualifier(String qualifier) {
32+
final var list = new ArrayList<String>();
33+
final var builder = new StringBuilder();
2334

24-
final var pathVariables = new ArrayList<String>();
25-
final var sb = new StringBuilder();
2635
for (var s : qualifier.split("/")) {
2736
if (s.startsWith(":")) {
2837
final var pathVar = s.substring(1);
29-
sb.append("(?<").append(pathVar).append(">.*?)");
30-
pathVariables.add(pathVar);
38+
builder.append("(?<").append(pathVar).append(">.+)");
39+
list.add(pathVar);
3140
} else {
32-
sb.append(s);
41+
builder.append(s);
3342
}
34-
sb.append("/");
43+
builder.append("/");
3544
}
36-
sb.setLength(sb.length() - 1);
45+
builder.setLength(builder.length() - 1);
3746

3847
this.qualifier = qualifier;
39-
this.pattern = Pattern.compile(sb.toString());
40-
this.pathVariables = Collections.unmodifiableList(pathVariables);
48+
this.pattern = Pattern.compile(builder.toString());
49+
this.pathVariables = Collections.unmodifiableList(list);
4150
this.size = sizeOf(qualifier);
4251
}
4352

53+
/**
54+
* Creates new {@link DynamicQualifier} instance.
55+
*
56+
* @param qualifier qualifier
57+
* @return {@link DynamicQualifier} instance
58+
*/
59+
public static DynamicQualifier from(String qualifier) {
60+
return new DynamicQualifier(qualifier);
61+
}
62+
63+
/**
64+
* Returns whether given qualifier is dynamic qualifier or not.
65+
*
66+
* @param qualifier qualifier
67+
* @return result
68+
*/
69+
public static boolean isDynamicQualifier(String qualifier) {
70+
return DYNAMIC_QUALIFIER_PATTERN.matcher(qualifier).find();
71+
}
72+
73+
/**
74+
* Original qualifier.
75+
*
76+
* @return result
77+
*/
4478
public String qualifier() {
4579
return qualifier;
4680
}
4781

82+
/**
83+
* Compiled pattern.
84+
*
85+
* @return result
86+
*/
4887
public Pattern pattern() {
4988
return pattern;
5089
}
5190

91+
/**
92+
* Returns path variable names.
93+
*
94+
* @return path variable names
95+
*/
5296
public List<String> pathVariables() {
5397
return pathVariables;
5498
}
5599

100+
/**
101+
* Size of qualifier. This is a number of {@code /} symbols.
102+
*
103+
* @return result
104+
*/
56105
public int size() {
57106
return size;
58107
}
59108

60-
public Map<String, String> matchQualifier(String input) {
61-
if (size != sizeOf(input)) {
109+
/**
110+
* Matches input qualifier against this dynamic qualifier.
111+
*
112+
* @param qualifier qualifier
113+
* @return matched path variables key-value map, or null if no matching occurred
114+
*/
115+
public Map<String, String> matchQualifier(String qualifier) {
116+
if (size != sizeOf(qualifier)) {
62117
return null;
63118
}
64119

65-
final var matcher = pattern.matcher(input);
120+
final var matcher = pattern.matcher(qualifier);
66121
if (!matcher.matches()) {
67122
return null;
68123
}
69124

70125
final var map = new LinkedHashMap<String, String>();
71126
for (var pathVar : pathVariables) {
72127
final var value = matcher.group(pathVar);
73-
Objects.requireNonNull(
74-
value, "Path variable value must not be null, path variable: " + pathVar);
128+
if (value == null || value.isEmpty()) {
129+
throw new IllegalArgumentException("Wrong path variable: " + pathVar);
130+
}
75131
map.put(pathVar, value);
76132
}
77133

0 commit comments

Comments
 (0)