1515 */
1616package org .springframework .data .repository .core .support ;
1717
18+ import kotlin .Unit ;
1819import kotlin .reflect .KFunction ;
20+ import kotlinx .coroutines .flow .Flow ;
1921import reactor .core .publisher .Flux ;
2022import reactor .core .publisher .Mono ;
2123
2224import java .lang .reflect .InvocationTargetException ;
2325import java .lang .reflect .Method ;
26+ import java .util .Collection ;
2427import java .util .stream .Stream ;
2528
2629import org .reactivestreams .Publisher ;
@@ -54,25 +57,63 @@ abstract class RepositoryMethodInvoker {
5457 private final Class <?> returnedType ;
5558 private final Invokable invokable ;
5659 private final boolean suspendedDeclaredMethod ;
57- private final boolean returnsReactiveType ;
5860
61+ @ SuppressWarnings ("ReactiveStreamsUnusedPublisher" )
5962 protected RepositoryMethodInvoker (Method method , Invokable invokable ) {
6063
6164 this .method = method ;
62- this .invokable = invokable ;
6365
6466 if (KotlinDetector .isKotlinReflectPresent ()) {
6567
6668 this .suspendedDeclaredMethod = KotlinReflectionUtils .isSuspend (method );
6769 this .returnedType = this .suspendedDeclaredMethod ? KotlinReflectionUtils .getReturnType (method )
6870 : method .getReturnType ();
71+
72+ // special case for most query methods: These can return Flux but we don't want to fail later on if the method
73+ // is void.
74+ if (suspendedDeclaredMethod ) {
75+
76+ this .invokable = args -> {
77+
78+ Object result = invokable .invoke (args );
79+
80+ if (returnedType == Unit .class ) {
81+
82+ if (result instanceof Mono <?> m ) {
83+ return m .then ();
84+ }
85+
86+ Flux <?> flux = result instanceof Flux <?> f ? f : ReactiveWrapperConverters .toWrapper (result , Flux .class );
87+ return flux .then ();
88+ }
89+
90+ if (returnedType != Flow .class ) {
91+
92+ if (result instanceof Mono <?> m ) {
93+ return m ;
94+ }
95+
96+ Flux <?> flux = result instanceof Flux <?> f ? f : ReactiveWrapperConverters .toWrapper (result , Flux .class );
97+
98+ if (Collection .class .isAssignableFrom (returnedType )) {
99+ return flux .collectList ();
100+ }
101+
102+ return flux .singleOrEmpty ();
103+ }
104+
105+ return result ;
106+ };
107+ } else {
108+ this .invokable = invokable ;
109+ }
110+
69111 } else {
70112
71113 this .suspendedDeclaredMethod = false ;
72114 this .returnedType = method .getReturnType ();
115+ this .invokable = invokable ;
73116 }
74-
75- this .returnsReactiveType = ReactiveWrappers .supports (returnedType );
76117 }
77118
78119 static RepositoryQueryMethodInvoker forRepositoryQuery (Method declaredMethod , RepositoryQuery query ) {
@@ -154,7 +195,7 @@ private RepositoryMethodInvocation computeInvocationResult(RepositoryMethodInvoc
154195 interface Invokable {
155196
156197 @ Nullable
157- Object invoke (Object [] args ) throws ReflectiveOperationException ;
198+ Object invoke (Object [] args ) throws Exception ;
158199 }
159200
160201 /**
@@ -214,8 +255,6 @@ Publisher<Object> decorate(Class<?> repositoryInterface, RepositoryInvocationMul
214255 */
215256 private static class RepositoryFragmentMethodInvoker extends RepositoryMethodInvoker {
216257
217- private final CoroutineAdapterInformation adapterInformation ;
218-
219258 public RepositoryFragmentMethodInvoker (Method declaredMethod , Object instance , Method baseClassMethod ) {
220259 this (CoroutineAdapterInformation .create (declaredMethod , baseClassMethod ), declaredMethod , instance ,
221260 baseClassMethod );
@@ -236,13 +275,12 @@ public RepositoryFragmentMethodInvoker(CoroutineAdapterInformation adapterInform
236275 return AopUtils .invokeJoinpointUsingReflection (instance , baseClassMethod , invocationArguments );
237276 }
238277 return AopUtils .invokeJoinpointUsingReflection (instance , baseClassMethod , args );
239- } catch (RuntimeException e ) {
278+ } catch (Exception e ) {
240279 throw e ;
241280 } catch (Throwable e ) {
242281 throw new RuntimeException (e );
243282 }
244283 });
245- this .adapterInformation = adapterInformation ;
246284 }
247285
248286 /**
0 commit comments