Skip to content

Commit 251e7d5

Browse files
authored
Fixed audit logs issue (#894)
* Fixed audit logs issue * Get rid of FIRE-AND-FORGET CommunicationMode
1 parent 24e023b commit 251e7d5

File tree

5 files changed

+41
-30
lines changed

5 files changed

+41
-30
lines changed
Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package io.scalecube.services;
22

33
public enum CommunicationMode {
4-
/** Corresponds to <code>Mono&lt;Void&gt; action(REQ)</code>. */
5-
FIRE_AND_FORGET,
6-
/** Corresponds to <code>Mono&lt;RESP&gt; action(REQ)</code>. */
4+
5+
/**
6+
* Corresponds to {@code Mono<Response> action(Request)} or {@code Mono<Void> action(Request)}.
7+
*/
78
REQUEST_RESPONSE,
8-
/** Corresponds to <code>Flux&lt;RESP&gt; action(REQ)</code>. */
9+
10+
/** Corresponds to {@code Flux<OutputData> action(Request)}. */
911
REQUEST_STREAM,
10-
/** Corresponds to <code>Flux&lt;RESP&gt; action(Flux&lt;REQ&gt;)</code>. */
12+
13+
/** Corresponds to {@code Flux<OutputData> action(Flux<InputData>)}. */
1114
REQUEST_CHANNEL;
1215
}

services-api/src/main/java/io/scalecube/services/Reflect.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.scalecube.services;
22

3-
import static io.scalecube.services.CommunicationMode.FIRE_AND_FORGET;
43
import static io.scalecube.services.CommunicationMode.REQUEST_CHANNEL;
54
import static io.scalecube.services.CommunicationMode.REQUEST_RESPONSE;
65
import static io.scalecube.services.CommunicationMode.REQUEST_STREAM;
@@ -335,28 +334,25 @@ private static void validateRequestType(Method method) {
335334
* <p>The following modes are supported:
336335
*
337336
* <ul>
338-
* <li>{@link CommunicationMode#REQUEST_CHANNEL} - service has at least one parameter,and the
339-
* first parameter is either of type return type {@link Flux} or {@link Publisher};
340-
* <li>{@link CommunicationMode#REQUEST_STREAM} - service's return type is {@link Flux}, and
341-
* parameter is not {@link Flux};
342-
* <li>{@link CommunicationMode#REQUEST_RESPONSE} - service's return type is Mono;
343-
* <li>{@link CommunicationMode#FIRE_AND_FORGET} - service returns void;
337+
* <li>{@link CommunicationMode#REQUEST_CHANNEL} - service has at least one parameter, and the
338+
* first parameter is either of {@link Flux} or {@link Publisher}.
339+
* <li>{@link CommunicationMode#REQUEST_STREAM} - service return type is {@link Flux}, and
340+
* parameter is not {@link Flux}.
341+
* <li>{@link CommunicationMode#REQUEST_RESPONSE} - service return type is either {@code
342+
* Mono<Pojo>} or {@code Mono<Void>}.
344343
* </ul>
345344
*
346-
* @param method - Service method to be analyzed.
347-
* @return - {@link CommunicationMode} of service method. If method does not correspond to any of
348-
* supported modes, throws {@link IllegalArgumentException}
345+
* @param method service method
346+
* @return {@link CommunicationMode} of service method, or throws {@link IllegalArgumentException}
349347
*/
350348
public static CommunicationMode communicationMode(Method method) {
351349
Class<?> returnType = method.getReturnType();
352350
if (isRequestChannel(method)) {
353351
return REQUEST_CHANNEL;
354352
} else if (returnType.isAssignableFrom(Flux.class)) {
355353
return REQUEST_STREAM;
356-
} else if (returnType.isAssignableFrom(Mono.class)) {
354+
} else if (returnType.isAssignableFrom(Mono.class) || returnType.isAssignableFrom(Void.TYPE)) {
357355
return REQUEST_RESPONSE;
358-
} else if (returnType.isAssignableFrom(Void.TYPE)) {
359-
return FIRE_AND_FORGET;
360356
} else {
361357
throw new IllegalArgumentException(
362358
"Service method is not supported (check return type or parameter type): " + method);

services-api/src/main/java/io/scalecube/services/ServiceCall.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ public Mono<ServiceMessage> requestOne(ServiceMessage request, Type responseType
225225
.doOnError(
226226
ex -> {
227227
if (logger != null) {
228-
logger.error("[{}] request: {}", request.qualifier(), request, ex);
228+
logger.error("[{}][error] request: {}", request.qualifier(), request, ex);
229229
}
230230
});
231231
}
@@ -270,13 +270,19 @@ public Flux<ServiceMessage> requestMany(ServiceMessage request, Type responseTyp
270270
.doOnSubscribe(
271271
s -> {
272272
if (logger != null && logger.isDebugEnabled()) {
273-
logger.debug("[{}] request: {}", request.qualifier(), request);
273+
logger.debug("[{}][subscribe] request: {}", request.qualifier(), request);
274+
}
275+
})
276+
.doOnComplete(
277+
() -> {
278+
if (logger != null && logger.isDebugEnabled()) {
279+
logger.debug("[{}][complete] request: {}", request.qualifier(), request);
274280
}
275281
})
276282
.doOnError(
277283
ex -> {
278284
if (logger != null) {
279-
logger.error("[{}] request: {}", request.qualifier(), request, ex);
285+
logger.error("[{}][error] request: {}", request.qualifier(), request, ex);
280286
}
281287
});
282288
}
@@ -354,9 +360,6 @@ public <T> T api(Class<T> serviceInterface) {
354360

355361
//noinspection EnhancedSwitchMigration
356362
switch (methodInfo.communicationMode()) {
357-
case FIRE_AND_FORGET:
358-
return serviceCall.oneWay(toServiceMessage(methodInfo, request));
359-
360363
case REQUEST_RESPONSE:
361364
return serviceCall
362365
.requestOne(toServiceMessage(methodInfo, request), returnType)

services-api/src/main/java/io/scalecube/services/methods/ServiceMethodInvoker.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static io.scalecube.services.auth.Authenticator.AUTH_CONTEXT_KEY;
44
import static io.scalecube.services.auth.Authenticator.NULL_AUTH_CONTEXT;
55

6+
import io.scalecube.services.CommunicationMode;
67
import io.scalecube.services.api.ServiceMessage;
78
import io.scalecube.services.auth.Authenticator;
89
import io.scalecube.services.auth.PrincipalMapper;
@@ -84,7 +85,7 @@ private Mono<?> invokeOne(ServiceMessage message, Object authData) {
8485
.doOnError(
8586
ex -> {
8687
if (logger != null) {
87-
logger.error("[{}] request: {}", qualifier, request, ex);
88+
logger.error("[{}][error] request: {}", qualifier, request, ex);
8889
}
8990
});
9091
})
@@ -99,6 +100,9 @@ private Mono<?> invokeOne(ServiceMessage message, Object authData) {
99100
* @return flux of service messages
100101
*/
101102
public Flux<ServiceMessage> invokeMany(ServiceMessage message) {
103+
if (methodInfo.communicationMode() == CommunicationMode.REQUEST_RESPONSE) {
104+
return Flux.from(invokeOne(message));
105+
}
102106
return Mono.deferContextual(context -> authenticate(message, (Context) context))
103107
.flatMapMany(authData -> invokeMany(message, authData))
104108
.map(response -> toResponse(response, message.qualifier(), message.dataFormat()))
@@ -116,13 +120,19 @@ private Flux<?> invokeMany(ServiceMessage message, Object authData) {
116120
.doOnSubscribe(
117121
s -> {
118122
if (logger != null && logger.isDebugEnabled()) {
119-
logger.debug("[{}] request: {}", qualifier, request);
123+
logger.debug("[{}][subscribe] request: {}", qualifier, request);
124+
}
125+
})
126+
.doOnComplete(
127+
() -> {
128+
if (logger != null && logger.isDebugEnabled()) {
129+
logger.debug("[{}][complete] request: {}", qualifier, request);
120130
}
121131
})
122132
.doOnError(
123133
ex -> {
124134
if (logger != null) {
125-
logger.error("[{}] request: {}", qualifier, request, ex);
135+
logger.error("[{}][error] request: {}", qualifier, request, ex);
126136
}
127137
});
128138
})

services-api/src/test/java/io/scalecube/services/methods/ReflectTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.scalecube.services.methods;
22

3-
import static io.scalecube.services.CommunicationMode.FIRE_AND_FORGET;
43
import static io.scalecube.services.CommunicationMode.REQUEST_CHANNEL;
54
import static io.scalecube.services.CommunicationMode.REQUEST_RESPONSE;
65
import static io.scalecube.services.CommunicationMode.REQUEST_STREAM;
@@ -45,12 +44,12 @@ public void testCommunicationMode(String methodName, CommunicationMode expectedM
4544

4645
static Stream<Arguments> argsCommunicationModeProvider() {
4746
return Stream.of(
48-
Arguments.of("fireAndForget", FIRE_AND_FORGET),
47+
Arguments.of("fireAndForget", REQUEST_RESPONSE),
4948
Arguments.of("emptyResponse", REQUEST_RESPONSE),
5049
Arguments.of("requestResponse", REQUEST_RESPONSE),
5150
Arguments.of("requestStream", REQUEST_STREAM),
5251
Arguments.of("requestChannel", REQUEST_CHANNEL),
53-
Arguments.of("fireAndForgetMessage", FIRE_AND_FORGET),
52+
Arguments.of("fireAndForgetMessage", REQUEST_RESPONSE),
5453
Arguments.of("emptyResponseMessage", REQUEST_RESPONSE),
5554
Arguments.of("requestResponseMessage", REQUEST_RESPONSE),
5655
Arguments.of("requestStreamMessage", REQUEST_STREAM),

0 commit comments

Comments
 (0)