22
33import static io .quarkus .devui .runtime .jsonrpc .JsonRpcKeys .MessageType ;
44
5- import java .lang .reflect .InvocationTargetException ;
65import java .lang .reflect .Method ;
76import java .util .ArrayList ;
87import java .util .HashMap ;
1110import java .util .concurrent .ConcurrentHashMap ;
1211
1312import jakarta .enterprise .context .ApplicationScoped ;
13+ import jakarta .inject .Inject ;
14+
15+ import org .jboss .logging .Logger ;
1416
1517import io .quarkus .arc .Arc ;
1618import io .quarkus .devui .runtime .jsonrpc .JsonRpcMethod ;
1719import io .quarkus .devui .runtime .jsonrpc .JsonRpcMethodName ;
1820import io .quarkus .devui .runtime .jsonrpc .JsonRpcReader ;
1921import io .quarkus .devui .runtime .jsonrpc .JsonRpcWriter ;
2022import io .smallrye .mutiny .Multi ;
23+ import io .smallrye .mutiny .Uni ;
24+ import io .smallrye .mutiny .infrastructure .Infrastructure ;
2125import io .smallrye .mutiny .subscription .Cancellable ;
26+ import io .smallrye .mutiny .unchecked .Unchecked ;
2227import io .vertx .core .http .ServerWebSocket ;
2328import io .vertx .core .json .JsonObject ;
2429
@@ -60,7 +65,7 @@ public void populateJsonRPCMethods(Map<String, Map<JsonRpcMethodName, JsonRpcMet
6065 javaMethod = providerInstance .getClass ().getMethod (jsonRpcMethod .getMethodName ());
6166 }
6267 ReflectionInfo reflectionInfo = new ReflectionInfo (jsonRpcMethod .getClazz (), providerInstance , javaMethod ,
63- params );
68+ params , jsonRpcMethod . getExplicitlyBlocking (), jsonRpcMethod . getExplicitlyNonBlocking () );
6469 String jsonRpcMethodName = extensionName + DOT + methodName ;
6570 jsonRpcToJava .put (jsonRpcMethodName , reflectionInfo );
6671 } catch (NoSuchMethodException | SecurityException ex ) {
@@ -70,21 +75,40 @@ public void populateJsonRPCMethods(Map<String, Map<JsonRpcMethodName, JsonRpcMet
7075 }
7176 }
7277
78+ private Uni <?> invoke (ReflectionInfo info , Object target , Object [] args ) {
79+ if (info .isReturningUni ()) {
80+ try {
81+ Uni <?> uni = ((Uni <?>) info .method .invoke (target , args ));
82+ if (info .isExplicitlyBlocking ()) {
83+ return uni .runSubscriptionOn (Infrastructure .getDefaultExecutor ());
84+ } else {
85+ return uni ;
86+ }
87+ } catch (Exception e ) {
88+ return Uni .createFrom ().failure (e );
89+ }
90+ } else {
91+ Uni <?> uni = Uni .createFrom ().item (Unchecked .supplier (() -> info .method .invoke (target , args )));
92+ if (!info .isExplicitlyNonBlocking ()) {
93+ return uni .runSubscriptionOn (Infrastructure .getDefaultExecutor ());
94+ } else {
95+ return uni ;
96+ }
97+ }
98+ }
99+
73100 public void addSocket (ServerWebSocket socket ) {
74101 socket .textMessageHandler ((e ) -> {
75- socket .writeTextMessage (route (e , socket ));
102+ JsonRpcReader jsonRpcRequest = JsonRpcReader .read (e );
103+ route (jsonRpcRequest , socket );
76104 });
77105 }
78106
79- private String route (String message , ServerWebSocket s ) {
80- JsonRpcReader jsonRpcRequest = JsonRpcReader .read (message );
81- JsonObject jsonRpcResponse = route (jsonRpcRequest , s );
82- return jsonRpcResponse .encodePrettily ();
83- }
107+ @ Inject
108+ Logger logger ;
84109
85110 @ SuppressWarnings ("unchecked" )
86- private JsonObject route (JsonRpcReader jsonRpcRequest , ServerWebSocket s ) {
87-
111+ private void route (JsonRpcReader jsonRpcRequest , ServerWebSocket s ) {
88112 String jsonRpcMethodName = jsonRpcRequest .getMethod ();
89113
90114 // First check some internal methods
@@ -95,52 +119,72 @@ private JsonObject route(JsonRpcReader jsonRpcRequest, ServerWebSocket s) {
95119 Cancellable cancellable = this .subscriptions .remove (jsonRpcRequest .getId ());
96120 cancellable .cancel ();
97121 }
98- return jsonRpcResponse ;
99-
122+ s .writeTextMessage (jsonRpcResponse .encode ());
100123 } else if (this .jsonRpcToJava .containsKey (jsonRpcMethodName )) { // Route to extension
101124 ReflectionInfo reflectionInfo = this .jsonRpcToJava .get (jsonRpcMethodName );
102- Object providerInstance = Arc .container ().select (reflectionInfo .bean ).get ();
103- try {
104- Object result ;
105- if (jsonRpcRequest .hasParams ()) {
106- Object [] args = getArgsAsObjects (reflectionInfo .params , jsonRpcRequest );
107- result = reflectionInfo .method .invoke (providerInstance , args );
108- } else {
109- result = reflectionInfo .method .invoke (providerInstance );
110- }
125+ Object target = Arc .container ().select (reflectionInfo .bean ).get ();
111126
112- // Here wrap in our own object that contain some more metadata
113- JsonObject jsonRpcResponse ;
114- if (reflectionInfo .isSubscription ()) {
115- // Subscription
116- Multi <?> subscription = (Multi ) result ;
117-
118- // TODO: If Jackson is on the classpath ?
119-
120- Cancellable cancellable = subscription .subscribe ().with ((t ) -> {
121- JsonObject jsonResponse = JsonRpcWriter .writeResponse (jsonRpcRequest .getId (), t ,
122- MessageType .SubscriptionMessage );
123- s .writeTextMessage (jsonResponse .encodePrettily ());
124- });
125-
126- this .subscriptions .put (jsonRpcRequest .getId (), cancellable );
127-
128- jsonRpcResponse = JsonRpcWriter .writeResponse (jsonRpcRequest .getId (), null , MessageType .Void );
129-
130- } else {
131- // Normal response
132- jsonRpcResponse = JsonRpcWriter .writeResponse (jsonRpcRequest .getId (), result , MessageType .Response );
127+ if (reflectionInfo .isReturningMulti ()) {
128+ Multi <?> multi ;
129+ try {
130+ if (jsonRpcRequest .hasParams ()) {
131+ Object [] args = getArgsAsObjects (reflectionInfo .params , jsonRpcRequest );
132+ multi = (Multi <?>) reflectionInfo .method .invoke (target , args );
133+ } else {
134+ multi = (Multi <?>) reflectionInfo .method .invoke (target );
135+ }
136+ } catch (Exception e ) {
137+ logger .errorf (e , "Unable to invoke method %s using JSON-RPC, request was: %s" , jsonRpcMethodName ,
138+ jsonRpcRequest );
139+ s .writeTextMessage (JsonRpcWriter .writeErrorResponse (jsonRpcRequest .getId (), jsonRpcMethodName , e ).encode ());
140+ return ;
133141 }
134142
135- return jsonRpcResponse ;
136- } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException ex ) {
137- throw new RuntimeException (ex );
143+ Cancellable cancellable = multi .subscribe ()
144+ .with (
145+ item -> {
146+ JsonObject jsonResponse = JsonRpcWriter .writeResponse (jsonRpcRequest .getId (), item ,
147+ MessageType .SubscriptionMessage );
148+ s .writeTextMessage (jsonResponse .encodePrettily ());
149+ },
150+ failure -> {
151+ s .writeTextMessage (JsonRpcWriter
152+ .writeErrorResponse (jsonRpcRequest .getId (), jsonRpcMethodName , failure ).encode ());
153+ this .subscriptions .remove (jsonRpcRequest .getId ());
154+ },
155+ () -> this .subscriptions .remove (jsonRpcRequest .getId ()));
156+
157+ this .subscriptions .put (jsonRpcRequest .getId (), cancellable );
158+ s .writeTextMessage (JsonRpcWriter .writeResponse (jsonRpcRequest .getId (), null , MessageType .Void ).encode ());
159+ } else {
160+ // The invocation will return a Uni<JsonObject>
161+ Uni <?> uni ;
162+ try {
163+ if (jsonRpcRequest .hasParams ()) {
164+ Object [] args = getArgsAsObjects (reflectionInfo .params , jsonRpcRequest );
165+ uni = invoke (reflectionInfo , target , args );
166+ } else {
167+ uni = invoke (reflectionInfo , target , new Object [0 ]);
168+ }
169+ } catch (Exception e ) {
170+ logger .errorf (e , "Unable to invoke method %s using JSON-RPC, request was: %s" , jsonRpcMethodName ,
171+ jsonRpcRequest );
172+ s .writeTextMessage (JsonRpcWriter .writeErrorResponse (jsonRpcRequest .getId (), jsonRpcMethodName , e ).encode ());
173+ return ;
174+ }
175+ uni .subscribe ()
176+ .with (item -> {
177+ s .writeTextMessage (JsonRpcWriter .writeResponse (jsonRpcRequest .getId (), item ,
178+ MessageType .Response ).encode ());
179+ }, failure -> {
180+ s .writeTextMessage (JsonRpcWriter
181+ .writeErrorResponse (jsonRpcRequest .getId (), jsonRpcMethodName , failure ).encode ());
182+ });
138183 }
184+ } else {
185+ // Method not found
186+ s .writeTextMessage (JsonRpcWriter .writeMethodNotFoundResponse (jsonRpcRequest .getId (), jsonRpcMethodName ).encode ());
139187 }
140-
141- // Method not found
142- return JsonRpcWriter .writeMethodNotFoundResponse (jsonRpcRequest .getId (), jsonRpcMethodName );
143-
144188 }
145189
146190 private Object [] getArgsAsObjects (Map <String , Class > params , JsonRpcReader jsonRpcRequest ) {
0 commit comments