Skip to content

Commit a87c626

Browse files
committed
Move MessageConsumer and MessageProducer to jsonrpc package
1 parent bfe63de commit a87c626

File tree

3 files changed

+192
-191
lines changed

3 files changed

+192
-191
lines changed

src/main/java/api/GoblintServiceLauncher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package api;
22

33
import api.json.GoblintMessageJsonHandler;
4-
import api.json.GoblintSocketMessageConsumer;
5-
import api.json.GoblintSocketMessageProducer;
4+
import api.jsonrpc.GoblintSocketMessageConsumer;
5+
import api.jsonrpc.GoblintSocketMessageProducer;
66
import api.jsonrpc.AutoClosingMessageProcessor;
77
import api.jsonrpc.CloseableEndpoint;
88
import gobpie.GobPieException;
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,56 @@
1-
package api.json;
2-
3-
import org.apache.logging.log4j.LogManager;
4-
import org.apache.logging.log4j.Logger;
5-
import org.eclipse.lsp4j.jsonrpc.JsonRpcException;
6-
import org.eclipse.lsp4j.jsonrpc.MessageConsumer;
7-
import org.eclipse.lsp4j.jsonrpc.json.MessageConstants;
8-
import org.eclipse.lsp4j.jsonrpc.json.MessageJsonHandler;
9-
import org.eclipse.lsp4j.jsonrpc.messages.Message;
10-
11-
import java.io.IOException;
12-
import java.io.OutputStream;
13-
import java.nio.charset.StandardCharsets;
14-
15-
16-
/**
17-
* A message consumer that serializes messages to JSON and sends them to an output stream.
18-
*
19-
* @since 0.0.3
20-
*/
21-
22-
public class GoblintSocketMessageConsumer implements MessageConsumer, MessageConstants {
23-
24-
private final String encoding;
25-
private final MessageJsonHandler jsonHandler;
26-
private final Object outputLock = new Object();
27-
private final OutputStream output;
28-
29-
private final Logger log = LogManager.getLogger(GoblintSocketMessageConsumer.class);
30-
31-
public GoblintSocketMessageConsumer(OutputStream output, MessageJsonHandler jsonHandler) {
32-
this(output, StandardCharsets.UTF_8.name(), jsonHandler);
33-
}
34-
35-
public GoblintSocketMessageConsumer(OutputStream output, String encoding, MessageJsonHandler jsonHandler) {
36-
this.output = output;
37-
this.encoding = encoding;
38-
this.jsonHandler = jsonHandler;
39-
}
40-
41-
@Override
42-
public void consume(Message message) {
43-
try {
44-
String content = jsonHandler.serialize(message) + "\n";
45-
byte[] contentBytes = content.getBytes(encoding);
46-
synchronized (outputLock) {
47-
output.write(contentBytes);
48-
output.flush();
49-
}
50-
log.debug("WRITTEN: {}", content);
51-
} catch (IOException exception) {
52-
throw new JsonRpcException(exception);
53-
}
54-
}
55-
56-
}
1+
package api.jsonrpc;
2+
3+
import org.apache.logging.log4j.LogManager;
4+
import org.apache.logging.log4j.Logger;
5+
import org.eclipse.lsp4j.jsonrpc.JsonRpcException;
6+
import org.eclipse.lsp4j.jsonrpc.MessageConsumer;
7+
import org.eclipse.lsp4j.jsonrpc.json.MessageConstants;
8+
import org.eclipse.lsp4j.jsonrpc.json.MessageJsonHandler;
9+
import org.eclipse.lsp4j.jsonrpc.messages.Message;
10+
11+
import java.io.IOException;
12+
import java.io.OutputStream;
13+
import java.nio.charset.StandardCharsets;
14+
15+
16+
/**
17+
* A message consumer that serializes messages to JSON and sends them to an output stream.
18+
*
19+
* @since 0.0.3
20+
*/
21+
22+
public class GoblintSocketMessageConsumer implements MessageConsumer, MessageConstants {
23+
24+
private final String encoding;
25+
private final MessageJsonHandler jsonHandler;
26+
private final Object outputLock = new Object();
27+
private final OutputStream output;
28+
29+
private final Logger log = LogManager.getLogger(GoblintSocketMessageConsumer.class);
30+
31+
public GoblintSocketMessageConsumer(OutputStream output, MessageJsonHandler jsonHandler) {
32+
this(output, StandardCharsets.UTF_8.name(), jsonHandler);
33+
}
34+
35+
public GoblintSocketMessageConsumer(OutputStream output, String encoding, MessageJsonHandler jsonHandler) {
36+
this.output = output;
37+
this.encoding = encoding;
38+
this.jsonHandler = jsonHandler;
39+
}
40+
41+
@Override
42+
public void consume(Message message) {
43+
try {
44+
String content = jsonHandler.serialize(message) + "\n";
45+
byte[] contentBytes = content.getBytes(encoding);
46+
synchronized (outputLock) {
47+
output.write(contentBytes);
48+
output.flush();
49+
}
50+
log.debug("WRITTEN: {}", content);
51+
} catch (IOException exception) {
52+
throw new JsonRpcException(exception);
53+
}
54+
}
55+
56+
}
Original file line numberDiff line numberDiff line change
@@ -1,133 +1,134 @@
1-
package api.json;
2-
3-
import org.apache.logging.log4j.LogManager;
4-
import org.eclipse.lsp4j.jsonrpc.*;
5-
import org.eclipse.lsp4j.jsonrpc.json.MessageConstants;
6-
import org.eclipse.lsp4j.jsonrpc.json.MessageJsonHandler;
7-
import org.eclipse.lsp4j.jsonrpc.json.StreamMessageProducer;
8-
import org.eclipse.lsp4j.jsonrpc.messages.Message;
9-
10-
import java.io.*;
11-
import java.util.logging.Level;
12-
import java.util.logging.Logger;
13-
14-
/**
15-
* A message producer that reads from an input stream and parses messages from JSON.
16-
*
17-
* @since 0.0.3
18-
*/
19-
20-
public class GoblintSocketMessageProducer implements MessageProducer, Closeable, MessageConstants {
21-
22-
private static final Logger LOG = Logger.getLogger(StreamMessageProducer.class.getName());
23-
24-
private final MessageJsonHandler jsonHandler;
25-
private final MessageIssueHandler issueHandler;
26-
private final BufferedReader inputReader;
27-
28-
private MessageConsumer callback;
29-
private boolean keepRunning;
30-
31-
private final org.apache.logging.log4j.Logger log = LogManager.getLogger(GoblintSocketMessageConsumer.class);
32-
33-
public GoblintSocketMessageProducer(InputStream input, MessageJsonHandler jsonHandler) {
34-
this(input, jsonHandler, null);
35-
}
36-
37-
public GoblintSocketMessageProducer(InputStream input, MessageJsonHandler jsonHandler, MessageIssueHandler issueHandler) {
38-
this.jsonHandler = jsonHandler;
39-
this.issueHandler = issueHandler;
40-
this.inputReader = new BufferedReader(new InputStreamReader(input));
41-
}
42-
43-
@Override
44-
public void listen(MessageConsumer callback) {
45-
if (keepRunning) {
46-
throw new IllegalStateException("This StreamMessageProducer is already running.");
47-
}
48-
this.keepRunning = true;
49-
this.callback = callback;
50-
try {
51-
while (keepRunning) {
52-
boolean result = handleMessage();
53-
if (!result)
54-
keepRunning = false;
55-
} // while (keepRunning)
56-
} catch (IOException exception) {
57-
if (JsonRpcException.indicatesStreamClosed(exception)) {
58-
// Only log the error if we had intended to keep running
59-
if (keepRunning)
60-
fireStreamClosed(exception);
61-
} else
62-
throw new JsonRpcException(exception);
63-
this.keepRunning = false;
64-
} finally {
65-
this.callback = null;
66-
this.keepRunning = false;
67-
}
68-
}
69-
70-
/**
71-
* Log an error.
72-
*/
73-
protected void fireError(Throwable error) {
74-
String message = error.getMessage() != null ? error.getMessage() : "An error occurred while processing an incoming message.";
75-
LOG.log(Level.SEVERE, message, error);
76-
}
77-
78-
/**
79-
* Report that the stream was closed through an exception.
80-
*/
81-
protected void fireStreamClosed(Exception cause) {
82-
String message = cause.getMessage() != null ? cause.getMessage() : "The input stream was closed.";
83-
LOG.log(Level.INFO, message, cause);
84-
}
85-
86-
87-
/**
88-
* Read the JSON content part of a message, parse it, and notify the callback.
89-
*
90-
* @return {@code true} if we should continue reading from the input stream, {@code false} if we should stop
91-
*/
92-
protected boolean handleMessage() throws IOException {
93-
if (callback == null)
94-
callback = message -> LOG.log(Level.INFO, "Received message: " + message);
95-
96-
try {
97-
String content = inputReader.readLine();
98-
log.debug("READ: {}", content);
99-
try {
100-
if (content != null) {
101-
Message message = jsonHandler.parseMessage(content);
102-
callback.consume(message);
103-
} else {
104-
return false;
105-
}
106-
} catch (MessageIssueException exception) {
107-
// An issue was found while parsing or validating the message
108-
if (issueHandler != null) {
109-
issueHandler.handle(exception.getRpcMessage(), exception.getIssues());
110-
} else {
111-
fireError(exception);
112-
for (var issue : exception.getIssues()) {
113-
fireError(issue.getCause());
114-
}
115-
}
116-
return false;
117-
}
118-
} catch (Exception exception) {
119-
// UnsupportedEncodingException can be thrown by String constructor
120-
// JsonParseException can be thrown by jsonHandler
121-
// We also catch arbitrary exceptions that are thrown by message consumers in order to keep this thread alive
122-
fireError(exception);
123-
return false;
124-
}
125-
return true;
126-
}
127-
128-
@Override
129-
public void close() {
130-
keepRunning = false;
131-
}
132-
}
133-
1+
package api.jsonrpc;
2+
3+
import api.jsonrpc.GoblintSocketMessageConsumer;
4+
import org.apache.logging.log4j.LogManager;
5+
import org.eclipse.lsp4j.jsonrpc.*;
6+
import org.eclipse.lsp4j.jsonrpc.json.MessageConstants;
7+
import org.eclipse.lsp4j.jsonrpc.json.MessageJsonHandler;
8+
import org.eclipse.lsp4j.jsonrpc.json.StreamMessageProducer;
9+
import org.eclipse.lsp4j.jsonrpc.messages.Message;
10+
11+
import java.io.*;
12+
import java.util.logging.Level;
13+
import java.util.logging.Logger;
14+
15+
/**
16+
* A message producer that reads from an input stream and parses messages from JSON.
17+
*
18+
* @since 0.0.3
19+
*/
20+
21+
public class GoblintSocketMessageProducer implements MessageProducer, Closeable, MessageConstants {
22+
23+
private static final Logger LOG = Logger.getLogger(StreamMessageProducer.class.getName());
24+
25+
private final MessageJsonHandler jsonHandler;
26+
private final MessageIssueHandler issueHandler;
27+
private final BufferedReader inputReader;
28+
29+
private MessageConsumer callback;
30+
private boolean keepRunning;
31+
32+
private final org.apache.logging.log4j.Logger log = LogManager.getLogger(GoblintSocketMessageConsumer.class);
33+
34+
public GoblintSocketMessageProducer(InputStream input, MessageJsonHandler jsonHandler) {
35+
this(input, jsonHandler, null);
36+
}
37+
38+
public GoblintSocketMessageProducer(InputStream input, MessageJsonHandler jsonHandler, MessageIssueHandler issueHandler) {
39+
this.jsonHandler = jsonHandler;
40+
this.issueHandler = issueHandler;
41+
this.inputReader = new BufferedReader(new InputStreamReader(input));
42+
}
43+
44+
@Override
45+
public void listen(MessageConsumer callback) {
46+
if (keepRunning) {
47+
throw new IllegalStateException("This StreamMessageProducer is already running.");
48+
}
49+
this.keepRunning = true;
50+
this.callback = callback;
51+
try {
52+
while (keepRunning) {
53+
boolean result = handleMessage();
54+
if (!result)
55+
keepRunning = false;
56+
} // while (keepRunning)
57+
} catch (IOException exception) {
58+
if (JsonRpcException.indicatesStreamClosed(exception)) {
59+
// Only log the error if we had intended to keep running
60+
if (keepRunning)
61+
fireStreamClosed(exception);
62+
} else
63+
throw new JsonRpcException(exception);
64+
this.keepRunning = false;
65+
} finally {
66+
this.callback = null;
67+
this.keepRunning = false;
68+
}
69+
}
70+
71+
/**
72+
* Log an error.
73+
*/
74+
protected void fireError(Throwable error) {
75+
String message = error.getMessage() != null ? error.getMessage() : "An error occurred while processing an incoming message.";
76+
LOG.log(Level.SEVERE, message, error);
77+
}
78+
79+
/**
80+
* Report that the stream was closed through an exception.
81+
*/
82+
protected void fireStreamClosed(Exception cause) {
83+
String message = cause.getMessage() != null ? cause.getMessage() : "The input stream was closed.";
84+
LOG.log(Level.INFO, message, cause);
85+
}
86+
87+
88+
/**
89+
* Read the JSON content part of a message, parse it, and notify the callback.
90+
*
91+
* @return {@code true} if we should continue reading from the input stream, {@code false} if we should stop
92+
*/
93+
protected boolean handleMessage() throws IOException {
94+
if (callback == null)
95+
callback = message -> LOG.log(Level.INFO, "Received message: " + message);
96+
97+
try {
98+
String content = inputReader.readLine();
99+
log.debug("READ: {}", content);
100+
try {
101+
if (content != null) {
102+
Message message = jsonHandler.parseMessage(content);
103+
callback.consume(message);
104+
} else {
105+
return false;
106+
}
107+
} catch (MessageIssueException exception) {
108+
// An issue was found while parsing or validating the message
109+
if (issueHandler != null) {
110+
issueHandler.handle(exception.getRpcMessage(), exception.getIssues());
111+
} else {
112+
fireError(exception);
113+
for (var issue : exception.getIssues()) {
114+
fireError(issue.getCause());
115+
}
116+
}
117+
return false;
118+
}
119+
} catch (Exception exception) {
120+
// UnsupportedEncodingException can be thrown by String constructor
121+
// JsonParseException can be thrown by jsonHandler
122+
// We also catch arbitrary exceptions that are thrown by message consumers in order to keep this thread alive
123+
fireError(exception);
124+
return false;
125+
}
126+
return true;
127+
}
128+
129+
@Override
130+
public void close() {
131+
keepRunning = false;
132+
}
133+
}
134+

0 commit comments

Comments
 (0)