|
15 | 15 | */ |
16 | 16 | package org.springframework.data.repository.core.support; |
17 | 17 |
|
18 | | -import kotlin.coroutines.Continuation; |
19 | 18 | import kotlin.reflect.KFunction; |
20 | | -import kotlinx.coroutines.reactive.AwaitKt; |
21 | 19 | import reactor.core.publisher.Flux; |
22 | 20 | import reactor.core.publisher.Mono; |
23 | 21 |
|
24 | 22 | import java.lang.reflect.InvocationTargetException; |
25 | 23 | import java.lang.reflect.Method; |
26 | | -import java.util.Collection; |
27 | 24 | import java.util.stream.Stream; |
28 | 25 |
|
29 | 26 | import org.reactivestreams.Publisher; |
| 27 | +import org.springframework.aop.support.AopUtils; |
30 | 28 | import org.springframework.core.KotlinDetector; |
31 | 29 | import org.springframework.data.repository.core.support.RepositoryMethodInvocationListener.RepositoryMethodInvocation; |
32 | 30 | import org.springframework.data.repository.core.support.RepositoryMethodInvocationListener.RepositoryMethodInvocationResult; |
@@ -116,12 +114,7 @@ public static boolean canInvoke(Method declaredMethod, Method baseClassMethod) { |
116 | 114 | @Nullable |
117 | 115 | public Object invoke(Class<?> repositoryInterface, RepositoryInvocationMulticaster multicaster, Object[] args) |
118 | 116 | throws Exception { |
119 | | - return shouldAdaptReactiveToSuspended() ? doInvokeReactiveToSuspended(repositoryInterface, multicaster, args) |
120 | | - : doInvoke(repositoryInterface, multicaster, args); |
121 | | - } |
122 | | - |
123 | | - protected boolean shouldAdaptReactiveToSuspended() { |
124 | | - return suspendedDeclaredMethod; |
| 117 | + return doInvoke(repositoryInterface, multicaster, args); |
125 | 118 | } |
126 | 119 |
|
127 | 120 | @Nullable |
@@ -153,46 +146,6 @@ private Object doInvoke(Class<?> repositoryInterface, RepositoryInvocationMultic |
153 | 146 | } |
154 | 147 | } |
155 | 148 |
|
156 | | - @Nullable |
157 | | - @SuppressWarnings({ "unchecked", "ConstantConditions" }) |
158 | | - private Object doInvokeReactiveToSuspended(Class<?> repositoryInterface, RepositoryInvocationMulticaster multicaster, |
159 | | - Object[] args) throws Exception { |
160 | | - |
161 | | - /* |
162 | | - * Kotlin suspended functions are invoked with a synthetic Continuation parameter that keeps track of the Coroutine context. |
163 | | - * We're invoking a method without Continuation as we expect the method to return any sort of reactive type, |
164 | | - * therefore we need to strip the Continuation parameter. |
165 | | - */ |
166 | | - Continuation<Object> continuation = (Continuation) args[args.length - 1]; |
167 | | - args[args.length - 1] = null; |
168 | | - |
169 | | - RepositoryMethodInvocationCaptor invocationResultCaptor = RepositoryMethodInvocationCaptor |
170 | | - .captureInvocationOn(repositoryInterface); |
171 | | - try { |
172 | | - |
173 | | - Publisher<?> result = new ReactiveInvocationListenerDecorator().decorate(repositoryInterface, multicaster, args, |
174 | | - invokable.invoke(args)); |
175 | | - |
176 | | - if (returnsReactiveType) { |
177 | | - return ReactiveWrapperConverters.toWrapper(result, returnedType); |
178 | | - } |
179 | | - |
180 | | - if (Collection.class.isAssignableFrom(returnedType)) { |
181 | | - result = (Publisher<?>) collectToList(result); |
182 | | - } |
183 | | - |
184 | | - return AwaitKt.awaitFirstOrNull(result, continuation); |
185 | | - } catch (Exception e) { |
186 | | - multicaster.notifyListeners(method, args, computeInvocationResult(invocationResultCaptor.error(e))); |
187 | | - throw e; |
188 | | - } |
189 | | - } |
190 | | - |
191 | | - // to avoid NoClassDefFoundError: org/reactivestreams/Publisher when loading this class ¯\_(ツ)_/¯ |
192 | | - private static Object collectToList(Object result) { |
193 | | - return Flux.from((Publisher<?>) result).collectList(); |
194 | | - } |
195 | | - |
196 | 149 | private RepositoryMethodInvocation computeInvocationResult(RepositoryMethodInvocationCaptor captured) { |
197 | 150 | return new RepositoryMethodInvocation(captured.getRepositoryInterface(), method, captured.getCapturedResult(), |
198 | 151 | captured.getDuration()); |
@@ -271,30 +224,27 @@ public RepositoryFragmentMethodInvoker(Method declaredMethod, Object instance, M |
271 | 224 | public RepositoryFragmentMethodInvoker(CoroutineAdapterInformation adapterInformation, Method declaredMethod, |
272 | 225 | Object instance, Method baseClassMethod) { |
273 | 226 | super(declaredMethod, args -> { |
274 | | - |
275 | | - if (adapterInformation.isAdapterMethod()) { |
276 | | - |
277 | | - /* |
278 | | - * Kotlin suspended functions are invoked with a synthetic Continuation parameter that keeps track of the Coroutine context. |
279 | | - * We're invoking a method without Continuation as we expect the method to return any sort of reactive type, |
280 | | - * therefore we need to strip the Continuation parameter. |
281 | | - */ |
282 | | - Object[] invocationArguments = new Object[args.length - 1]; |
283 | | - System.arraycopy(args, 0, invocationArguments, 0, invocationArguments.length); |
284 | | - |
285 | | - return baseClassMethod.invoke(instance, invocationArguments); |
| 227 | + try { |
| 228 | + if (adapterInformation.shouldAdaptReactiveToSuspended()) { |
| 229 | + /* |
| 230 | + * Kotlin suspended functions are invoked with a synthetic Continuation parameter that keeps track of the Coroutine context. |
| 231 | + * We're invoking a method without Continuation as we expect the method to return any sort of reactive type, |
| 232 | + * therefore we need to strip the Continuation parameter. |
| 233 | + */ |
| 234 | + Object[] invocationArguments = new Object[args.length - 1]; |
| 235 | + System.arraycopy(args, 0, invocationArguments, 0, invocationArguments.length); |
| 236 | + return AopUtils.invokeJoinpointUsingReflection(instance, baseClassMethod, invocationArguments); |
| 237 | + } |
| 238 | + return AopUtils.invokeJoinpointUsingReflection(instance, baseClassMethod, args); |
| 239 | + } catch (RuntimeException e) { |
| 240 | + throw e; |
| 241 | + } catch (Throwable e) { |
| 242 | + throw new RuntimeException(e); |
286 | 243 | } |
287 | | - |
288 | | - return baseClassMethod.invoke(instance, args); |
289 | 244 | }); |
290 | 245 | this.adapterInformation = adapterInformation; |
291 | 246 | } |
292 | 247 |
|
293 | | - @Override |
294 | | - protected boolean shouldAdaptReactiveToSuspended() { |
295 | | - return adapterInformation.shouldAdaptReactiveToSuspended(); |
296 | | - } |
297 | | - |
298 | 248 | /** |
299 | 249 | * Value object capturing whether a suspended Kotlin method (Coroutine method) can be bridged with a native or |
300 | 250 | * reactive fragment method. |
|
0 commit comments