|
12 | 12 | import java.lang.reflect.Method; |
13 | 13 | import java.util.Collections; |
14 | 14 | import java.util.Map; |
| 15 | +import java.util.Objects; |
15 | 16 | import java.util.Optional; |
16 | 17 | import java.util.StringJoiner; |
17 | 18 | import org.reactivestreams.Publisher; |
@@ -52,11 +53,11 @@ public ServiceMethodInvoker( |
52 | 53 | ServiceMessageDataDecoder dataDecoder, |
53 | 54 | Authenticator authenticator, |
54 | 55 | PrincipalMapper<Object> principalMapper) { |
55 | | - this.method = method; |
56 | | - this.service = service; |
57 | | - this.methodInfo = methodInfo; |
58 | | - this.errorMapper = errorMapper; |
59 | | - this.dataDecoder = dataDecoder; |
| 56 | + this.method = Objects.requireNonNull(method, "method"); |
| 57 | + this.service = Objects.requireNonNull(service, "service"); |
| 58 | + this.methodInfo = Objects.requireNonNull(methodInfo, "methodInfo"); |
| 59 | + this.errorMapper = Objects.requireNonNull(errorMapper, "errorMapper"); |
| 60 | + this.dataDecoder = Objects.requireNonNull(dataDecoder, "dataDecoder"); |
60 | 61 | this.authenticator = authenticator; |
61 | 62 | this.principalMapper = principalMapper; |
62 | 63 | } |
@@ -105,18 +106,18 @@ public Flux<ServiceMessage> invokeBidirectional(Publisher<ServiceMessage> publis |
105 | 106 |
|
106 | 107 | private Mono<?> deferWithContextOne(ServiceMessage message, Map<String, String> authData) { |
107 | 108 | return Mono.deferWithContext(context -> Mono.from(invoke(toRequest(message)))) |
108 | | - .subscriberContext(context -> newPrincipalContext(authData)); |
| 109 | + .subscriberContext(context -> newPrincipalContext(authData, context)); |
109 | 110 | } |
110 | 111 |
|
111 | 112 | private Flux<?> deferWithContextMany(ServiceMessage message, Map<String, String> authData) { |
112 | 113 | return Flux.deferWithContext(context -> Flux.from(invoke(toRequest(message)))) |
113 | | - .subscriberContext(context -> newPrincipalContext(authData)); |
| 114 | + .subscriberContext(context -> newPrincipalContext(authData, context)); |
114 | 115 | } |
115 | 116 |
|
116 | 117 | private Flux<?> deferWithContextBidirectional( |
117 | 118 | Flux<ServiceMessage> messages, Map<String, String> authData) { |
118 | 119 | return Flux.deferWithContext(context -> messages.map(this::toRequest).transform(this::invoke)) |
119 | | - .subscriberContext(context -> newPrincipalContext(authData)); |
| 120 | + .subscriberContext(context -> newPrincipalContext(authData, context)); |
120 | 121 | } |
121 | 122 |
|
122 | 123 | private Publisher<?> invoke(Object request) { |
@@ -152,11 +153,9 @@ private Mono<Map<String, String>> authenticate(ServiceMessage message) { |
152 | 153 | if (!methodInfo.isAuth()) { |
153 | 154 | return Mono.just(Collections.emptyMap()); |
154 | 155 | } |
155 | | - |
156 | 156 | if (authenticator == null) { |
157 | 157 | throw new UnauthorizedException("Authenticator not found"); |
158 | 158 | } |
159 | | - |
160 | 159 | return authenticator.authenticate(message.headers()).onErrorMap(this::toUnauthorizedException); |
161 | 160 | } |
162 | 161 |
|
@@ -202,7 +201,10 @@ private ServiceMessage toResponse(Object response, String dataFormat) { |
202 | 201 | .build(); |
203 | 202 | } |
204 | 203 |
|
205 | | - private Context newPrincipalContext(Map<String, String> authData) { |
| 204 | + private Context newPrincipalContext(Map<String, String> authData, Context context) { |
| 205 | + if (principalMapper == null) { |
| 206 | + return context; |
| 207 | + } |
206 | 208 | Object value = principalMapper.map(authData); |
207 | 209 | return Context.of(value.getClass(), value); |
208 | 210 | } |
|
0 commit comments