1111import org .slf4j .Logger ;
1212import org .slf4j .LoggerFactory ;
1313import org .zeromq .ZMQ ;
14+ import org .zeromq .ZMQException ;
1415
16+ /** TRex Transport class to create zmq socket for connection to trex server */
1517public class TRexTransport {
1618
1719 private static final Logger LOGGER = LoggerFactory .getLogger (TRexTransport .class );
18-
19- public static final int DEFAULT_TIMEOUT = 3000 ;
20-
20+ private static final int DEFAULT_TIMEOUT = 3000 ;
21+ private static final String PROTOCOL = "tcp" ;
2122 private final String connectionString ;
22-
2323 private final IDataCompressor dataCompressor ;
24-
25- private ZMQ . Context zmqCtx = ZMQ .context ( 1 ) ;
26-
27- private ZMQ . Socket zmqSocket ;
28-
29- private String protocol = "tcp" ;
30-
31- private String host ;
32-
33- private String port ;
34-
24+ private final ZMQ . Context zmqCtx = ZMQ . context ( 1 );
25+ private final ZMQ .Socket zmqSocket ;
26+ private final String host ;
27+ private final String port ;
28+
29+ /**
30+ * @param host Server address
31+ * @param port Server port
32+ * @param timeout How long to wait for server response
33+ * @param dataCompressor
34+ */
3535 public TRexTransport (String host , String port , int timeout , IDataCompressor dataCompressor ) {
3636 this .host = host ;
3737 this .port = port ;
38- zmqSocket = zmqCtx .socket (ZMQ .REQ );
38+ this . zmqSocket = zmqCtx .socket (ZMQ .REQ );
3939 int actualTimeout = timeout <= 0 ? DEFAULT_TIMEOUT : timeout ;
4040 zmqSocket .setReceiveTimeOut (actualTimeout );
4141 zmqSocket .setSendTimeOut (actualTimeout );
42- connectionString = protocol + "://" + this .host + ":" + this .port ;
42+ this . connectionString = PROTOCOL + "://" + this .host + ":" + this .port ;
4343 zmqSocket .connect (connectionString );
4444 this .dataCompressor = dataCompressor ;
4545 }
4646
47+ /**
48+ * @param host Server address
49+ * @param port Server port
50+ * @param timeout How long to wait for server response
51+ */
4752 public TRexTransport (String host , String port , int timeout ) {
4853 this (host , port , timeout , new TRexDataCompressor ());
4954 }
5055
56+ /**
57+ * Send TRex command to the server
58+ *
59+ * @param command
60+ * @return RPCResponse
61+ * @throws IOException
62+ */
5163 public RPCResponse sendCommand (TRexCommand command ) throws IOException {
5264 String json = new ObjectMapper ().writeValueAsString (command .getParameters ());
5365 String response = sendJson (json );
@@ -60,6 +72,13 @@ public RPCResponse sendCommand(TRexCommand command) throws IOException {
6072 return objectMapper .readValue (response , RPCResponse .class );
6173 }
6274
75+ /**
76+ * Send TRex command list to the server
77+ *
78+ * @param commands
79+ * @return RPCResponse
80+ * @throws IOException
81+ */
6382 public RPCResponse [] sendCommands (List <TRexCommand > commands ) throws IOException {
6483 if (commands .size () == 1 ) {
6584 return new RPCResponse [] {sendCommand (commands .get (0 ))};
@@ -77,34 +96,53 @@ public RPCResponse[] sendCommands(List<TRexCommand> commands) throws IOException
7796 return new ObjectMapper ().readValue (response , RPCResponse [].class );
7897 }
7998
99+ /** @return server host address */
80100 public String getHost () {
81101 return host ;
82102 }
83103
104+ /** @return server port */
84105 public String getPort () {
85106 return port ;
86107 }
87108
109+ /** close zmq connection to server */
88110 public synchronized void close () {
89111 zmqSocket .disconnect (connectionString );
90112 zmqSocket .close ();
91113 zmqCtx .close ();
92114 }
93115
116+ /** @return zmq socket */
94117 public ZMQ .Socket getSocket () {
95118 return zmqSocket ;
96119 }
97120
121+ /**
122+ * Send json string to server
123+ *
124+ * @param json
125+ * @return json string
126+ */
98127 public synchronized String sendJson (String json ) {
99- LOGGER .debug ("JSON Req: " + json );
128+ LOGGER .debug ("JSON Req: {}" , json );
100129
101130 byte [] compressed = this .dataCompressor .compressStringToBytes (json );
102131
103- zmqSocket .send (compressed );
132+ try {
133+ zmqSocket .send (compressed );
134+ } catch (ZMQException e ) {
135+ throw new IllegalStateException (
136+ "Did not get any response from server "
137+ + getHost ()
138+ + " within timeout "
139+ + zmqSocket .getReceiveTimeOut (),
140+ e );
141+ }
104142 byte [] msg = zmqSocket .recv ();
105143
106144 String response = this .dataCompressor .decompressBytesToString (msg );
107- LOGGER .debug ("JSON Resp: " + response );
145+ LOGGER .debug ("JSON Resp: {}" , response );
108146 return response ;
109147 }
110148}
0 commit comments