Skip to content

Commit b790ac6

Browse files
committed
exposing client ExecutorService for multi-threading support
1 parent 19589fa commit b790ac6

File tree

3 files changed

+54
-12
lines changed

3 files changed

+54
-12
lines changed

README.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,31 @@ client.on("topic1.*", (Object... eventArgs) -> {
147147
});
148148
```
149149

150+
#### Concurrency Support
151+
152+
The underlying EventEmitter implementation uses a single thread for event handling. If you need to handle events concurrently,
153+
consider using a thread pool or executor service to process events in parallel.
154+
155+
You can access the client `ExecutorService` instance by calling the `getExecutorService()` method:
156+
157+
```java
158+
client.on("secure/inbound.gettime", (Object... eventArgs) -> {
159+
var replyFn = (ReplyFunction) eventArgs[1];
160+
logger.info("Responding to gettime request on a separate thread...");
161+
162+
client.getExecutorService().submit(() -> {
163+
try {
164+
var response = Map.of("time", new Date());
165+
// Send a reply and wait for acknowledgment
166+
replyFn.reply(response, "ok", false).waitForAck().get();
167+
logger.info("Response delivered!");
168+
} catch (Exception e) {
169+
logger.log(Level.SEVERE, "Failed to send reply", e);
170+
}
171+
});
172+
});
173+
```
174+
150175
### Publishing Messages
151176

152177
Publish messages to a topic:
@@ -254,6 +279,12 @@ Creates a new `RealtimeClient` instance.
254279

255280
#### Methods
256281

282+
- **getExecutorService()**: Returns the `ExecutorService` instance used by the client.
283+
284+
```java
285+
public ExecutorService getExecutorService();
286+
```
287+
257288
- **connect()**: Connects the client to the WebSocket Messaging Gateway.
258289

259290
```java

src/main/java/realtime/pubsub/RealtimeClient.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,18 @@ public ReplyFunction createReplyFunction(IncomingMessage message) {
115115
};
116116
}
117117

118+
/**
119+
* Gets the ExecutorService used by the RealtimeClient. A fixed thread pool (size = number of processors).
120+
*
121+
* @return The ExecutorService instance.
122+
*/
123+
public ExecutorService getExecutorService() {
124+
return executorService;
125+
}
126+
118127
private void onAck(Object[] args) {
119128
if (args.length == 0) return;
120-
var message = (IncomingMessage)(args[0]);
129+
var message = (IncomingMessage) (args[0]);
121130
if (message == null) return;
122131

123132
Map<String, Object> data = safeCastToMap(message.getData());
@@ -132,7 +141,7 @@ private void onAck(Object[] args) {
132141

133142
private void onResponse(Object[] args) {
134143
if (args.length == 0) return;
135-
var message = (IncomingMessage)(args[0]);
144+
var message = (IncomingMessage) (args[0]);
136145
if (message == null) return;
137146

138147
String topic = (String) message.getTopic();
@@ -153,7 +162,7 @@ private void onResponse(Object[] args) {
153162

154163
private void onWelcome(Object[] args) {
155164
if (args.length == 0) return;
156-
var message = (IncomingMessage)(args[0]);
165+
var message = (IncomingMessage) (args[0]);
157166
if (message == null) return;
158167

159168
Map<String, Object> data = safeCastToMap(message.getData());
@@ -550,15 +559,7 @@ private String maskUrl(String url) {
550559
try {
551560
URI uri = new URI(url);
552561
String maskedQuery = (uri.getQuery() != null) ? "****" : null;
553-
URI maskedUri = new URI(
554-
uri.getScheme(),
555-
uri.getUserInfo(),
556-
uri.getHost(),
557-
uri.getPort(),
558-
uri.getPath(),
559-
maskedQuery,
560-
uri.getFragment()
561-
);
562+
URI maskedUri = new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), uri.getPath(), maskedQuery, uri.getFragment());
562563
return maskedUri.toString();
563564
} catch (URISyntaxException e) {
564565
// If masking fails, return a generic message

src/test/java/realtime/pubsub/RealtimeClientTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,16 @@ protected WebSocketContainer createWebSocketContainer() {
5757
}).when(containerMock).connectToServer(any(Endpoint.class), any(ClientEndpointConfig.class), any(URI.class));
5858
}
5959

60+
@Test
61+
public void testGetExecutorService() {
62+
// Act
63+
ExecutorService executorService = client.getExecutorService();
64+
65+
// Assert
66+
assertNotNull(executorService);
67+
assertInstanceOf(ThreadPoolExecutor.class, executorService);
68+
}
69+
6070
@Test
6171
public void testConnect() throws Exception {
6272
// Act

0 commit comments

Comments
 (0)