1+ /* Autogenerated file, do not edit manually */
2+ package chat .delta .rpc ;
3+
4+ import chat .delta .util .SettableFuture ;
5+
6+ import com .fasterxml .jackson .core .JsonProcessingException ;
7+ import com .fasterxml .jackson .core .type .TypeReference ;
8+ import com .fasterxml .jackson .databind .JsonNode ;
9+ import com .fasterxml .jackson .databind .ObjectMapper ;
10+
11+ import java .io .IOException ;
12+ import java .util .Map ;
13+ import java .util .concurrent .ConcurrentHashMap ;
14+ import java .util .concurrent .ExecutionException ;
15+
16+ /* Basic RPC Transport implementation */
17+ public abstract class BaseTransport implements Rpc .Transport {
18+ private final Map <Integer , SettableFuture <JsonNode >> requestFutures = new ConcurrentHashMap <>();
19+ private int requestId = 0 ;
20+ private final ObjectMapper mapper = new ObjectMapper ();
21+ private Thread worker ;
22+
23+ /* Send a Request as raw JSON String to the RPC server */
24+ protected abstract void sendRequest (String jsonRequest );
25+
26+ /* Get next Response as raw JSON String from the RPC server */
27+ protected abstract String getResponse ();
28+
29+ public ObjectMapper getObjectMapper () {
30+ return mapper ;
31+ }
32+
33+ public void call (String method , JsonNode ... params ) throws RpcException {
34+ innerCall (method , params );
35+ }
36+
37+ public <T > T callForResult (TypeReference <T > resultType , String method , JsonNode ... params ) throws RpcException {
38+ try {
39+ JsonNode node = innerCall (method , params );
40+ if (node .isNull ()) return null ;
41+ return mapper .readValue (node .traverse (), resultType );
42+ } catch (IOException e ) {
43+ throw new RpcException (e .getMessage ());
44+ }
45+ }
46+
47+ private JsonNode innerCall (String method , JsonNode ... params ) throws RpcException {
48+ int id ;
49+ synchronized (this ) {
50+ id = ++requestId ;
51+ ensureWorkerThread ();
52+ }
53+ try {
54+ String jsonRequest = mapper .writeValueAsString (new Request (method , params , id ));
55+ SettableFuture <JsonNode > future = new SettableFuture <>();
56+ requestFutures .put (id , future );
57+ sendRequest (jsonRequest );
58+ return future .get ();
59+ } catch (ExecutionException e ) {
60+ throw (RpcException )e .getCause ();
61+ } catch (InterruptedException e ) {
62+ throw new RpcException (e .getMessage ());
63+ } catch (JsonProcessingException e ) {
64+ throw new RpcException (e .getMessage ());
65+ }
66+ }
67+
68+ private void ensureWorkerThread () {
69+ if (worker != null ) return ;
70+
71+ worker = new Thread (() -> {
72+ while (true ) {
73+ try {
74+ processResponse ();
75+ } catch (JsonProcessingException e ) {
76+ e .printStackTrace ();
77+ }
78+ }
79+ }, "jsonrpcThread" );
80+ worker .start ();
81+ }
82+
83+ private void processResponse () throws JsonProcessingException {
84+ String jsonResponse = getResponse ();
85+ Response response = mapper .readValue (jsonResponse , Response .class );
86+
87+ if (response .id == 0 ) { // Got JSON-RPC notification/event, ignore
88+ return ;
89+ }
90+
91+ SettableFuture <JsonNode > future = requestFutures .remove (response .id );
92+ if (future == null ) { // Got a response with unknown ID, ignore
93+ return ;
94+ }
95+
96+ if (response .error != null ) {
97+ future .setException (new RpcException (response .error .toString ()));
98+ } else if (response .result != null ) {
99+ future .set (response .result );
100+ } else {
101+ future .setException (new RpcException ("Got JSON-RPC response without result or error: " + jsonResponse ));
102+ }
103+ }
104+
105+ private static class Request {
106+ private final String jsonrpc = "2.0" ;
107+ public final String method ;
108+ public final JsonNode [] params ;
109+ public final int id ;
110+
111+ public Request (String method , JsonNode [] params , int id ) {
112+ this .method = method ;
113+ this .params = params ;
114+ this .id = id ;
115+ }
116+ }
117+
118+ private static class Response {
119+ public String jsonrpc ;
120+ public int id ;
121+ public JsonNode result ;
122+ public JsonNode error ;
123+ }
124+ }
0 commit comments