Skip to content

Commit f35db18

Browse files
authored
Fix #1409 Socket Mode: Slow message consumption when listeners do not immediately return ack() (#1411)
1 parent 9e815b0 commit f35db18

File tree

4 files changed

+296
-54
lines changed

4 files changed

+296
-54
lines changed

bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java

Lines changed: 94 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,23 @@
33
import com.google.gson.Gson;
44
import com.google.gson.JsonElement;
55
import com.slack.api.bolt.App;
6-
import com.slack.api.bolt.request.Request;
7-
import com.slack.api.bolt.response.Response;
86
import com.slack.api.bolt.jakarta_socket_mode.request.SocketModeRequest;
97
import com.slack.api.bolt.jakarta_socket_mode.request.SocketModeRequestParser;
8+
import com.slack.api.bolt.request.Request;
9+
import com.slack.api.bolt.response.Response;
1010
import com.slack.api.jakarta_socket_mode.JakartaSocketModeClientFactory;
1111
import com.slack.api.socket_mode.SocketModeClient;
1212
import com.slack.api.socket_mode.response.AckResponse;
1313
import com.slack.api.util.json.GsonFactory;
14+
import com.slack.api.util.thread.DaemonThreadExecutorServiceFactory;
1415
import lombok.Builder;
1516
import lombok.Data;
1617
import lombok.extern.slf4j.Slf4j;
1718

1819
import java.io.IOException;
1920
import java.util.HashMap;
2021
import java.util.Map;
22+
import java.util.concurrent.ExecutorService;
2123
import java.util.function.Function;
2224
import java.util.function.Supplier;
2325

@@ -27,6 +29,7 @@ public class SocketModeApp {
2729
private final App app;
2830
private final Supplier<SocketModeClient> clientFactory;
2931
private SocketModeClient client;
32+
private final ExecutorService executorService;
3033

3134
private static final Function<ErrorContext, Response> DEFAULT_ERROR_HANDLER = (context) -> {
3235
Exception e = context.getException();
@@ -69,35 +72,22 @@ private static void sendSocketModeResponse(
6972
private static Supplier<SocketModeClient> buildSocketModeClientFactory(
7073
App app,
7174
String appToken,
72-
Function<ErrorContext, Response> errorHandler
75+
Function<ErrorContext, Response> errorHandler,
76+
ExecutorService executorService
7377
) {
7478
return () -> {
7579
try {
7680
final SocketModeClient client = JakartaSocketModeClientFactory.create(app.slack(), appToken);
7781
final SocketModeRequestParser requestParser = new SocketModeRequestParser(app.config());
7882
final Gson gson = GsonFactory.createSnakeCase(app.slack().getConfig());
7983
client.addWebSocketMessageListener(message -> {
80-
long startMillis = System.currentTimeMillis();
81-
SocketModeRequest req = requestParser.parse(message);
82-
if (req != null) {
83-
try {
84-
Response boltResponse = app.run(req.getBoltRequest());
85-
if (boltResponse.getStatusCode() != 200) {
86-
log.warn("Unsuccessful Bolt app execution (status: {}, body: {})",
87-
boltResponse.getStatusCode(), boltResponse.getBody());
88-
return;
89-
}
90-
sendSocketModeResponse(client, gson, req, boltResponse);
91-
} catch (Exception e) {
92-
ErrorContext context = ErrorContext.builder().request(req.getBoltRequest()).exception(e).build();
93-
Response errorResponse = errorHandler.apply(context);
94-
if (errorResponse != null) {
95-
sendSocketModeResponse(client, gson, req, errorResponse);
96-
}
97-
} finally {
98-
long spentMillis = System.currentTimeMillis() - startMillis;
99-
log.debug("Response time: {} milliseconds", spentMillis);
100-
}
84+
if (executorService != null) {
85+
// asynchronous
86+
executorService.execute(() -> runBoltApp(
87+
message, app, client, requestParser, errorHandler, gson));
88+
} else {
89+
// synchronous
90+
runBoltApp(message, app, client, requestParser, errorHandler, gson);
10191
}
10292
});
10393
return client;
@@ -108,34 +98,101 @@ private static Supplier<SocketModeClient> buildSocketModeClientFactory(
10898
};
10999
}
110100

101+
private static void runBoltApp(
102+
String message,
103+
App app,
104+
SocketModeClient client,
105+
SocketModeRequestParser requestParser,
106+
Function<ErrorContext, Response> errorHandler,
107+
Gson gson
108+
) {
109+
long startMillis = System.currentTimeMillis();
110+
SocketModeRequest req = requestParser.parse(message);
111+
if (req != null) {
112+
try {
113+
Response boltResponse = app.run(req.getBoltRequest());
114+
if (boltResponse.getStatusCode() != 200) {
115+
log.warn("Unsuccessful Bolt app execution (status: {}, body: {})",
116+
boltResponse.getStatusCode(), boltResponse.getBody());
117+
return;
118+
}
119+
sendSocketModeResponse(client, gson, req, boltResponse);
120+
} catch (Exception e) {
121+
ErrorContext context = ErrorContext.builder().request(req.getBoltRequest()).exception(e).build();
122+
Response errorResponse = errorHandler.apply(context);
123+
if (errorResponse != null) {
124+
sendSocketModeResponse(client, gson, req, errorResponse);
125+
}
126+
} finally {
127+
long spentMillis = System.currentTimeMillis() - startMillis;
128+
log.debug("Response time: {} milliseconds", spentMillis);
129+
}
130+
}
131+
}
132+
133+
private static ExecutorService buildExecutorService(int concurrency) {
134+
return DaemonThreadExecutorServiceFactory.createDaemonThreadPoolExecutor(
135+
"slack-bolt-socket-mode", concurrency);
136+
}
137+
138+
// -------------------------------------------
139+
111140
public SocketModeApp(App app) throws IOException {
112141
this(System.getenv("SLACK_APP_TOKEN"), app);
113142
}
114143

144+
public SocketModeApp(App app, int concurrency) throws IOException {
145+
this(System.getenv("SLACK_APP_TOKEN"), app, concurrency);
146+
}
115147

116148
public SocketModeApp(String appToken, App app) throws IOException {
117149
this(appToken, DEFAULT_ERROR_HANDLER, app);
118150
}
119151

152+
public SocketModeApp(String appToken, App app, int concurrency) throws IOException {
153+
this(appToken, DEFAULT_ERROR_HANDLER, app, buildExecutorService(concurrency));
154+
}
155+
120156
public SocketModeApp(
121157
String appToken,
122158
Function<ErrorContext, Response> errorHandler,
123159
App app
124160
) throws IOException {
125-
this(buildSocketModeClientFactory(app, appToken, errorHandler), app);
161+
this(buildSocketModeClientFactory(app, appToken, errorHandler, null), app);
126162
}
127163

128164
public SocketModeApp(
129165
String appToken,
130166
App app,
131167
Function<ErrorContext, Response> errorHandler
132168
) throws IOException {
133-
this(buildSocketModeClientFactory(app, appToken, errorHandler), app);
169+
this(buildSocketModeClientFactory(app, appToken, errorHandler, null), app);
134170
}
135171

136172
public SocketModeApp(Supplier<SocketModeClient> clientFactory, App app) {
173+
this(clientFactory, app, null);
174+
}
175+
176+
177+
// intentionally private to avoid exposing the ExecutorService initialization
178+
private SocketModeApp(
179+
String appToken,
180+
Function<ErrorContext, Response> errorHandler,
181+
App app,
182+
ExecutorService executorService
183+
) throws IOException {
184+
this(buildSocketModeClientFactory(app, appToken, errorHandler, executorService), app, executorService);
185+
}
186+
187+
// intentionally private to avoid exposing the ExecutorService initialization
188+
private SocketModeApp(
189+
Supplier<SocketModeClient> clientFactory,
190+
App app,
191+
ExecutorService executorService
192+
) {
137193
this.clientFactory = clientFactory;
138194
this.app = app;
195+
this.executorService = executorService;
139196
}
140197

141198
/**
@@ -152,10 +209,9 @@ public SocketModeApp(SocketModeClient socketModeClient, App app) {
152209
this.client = socketModeClient;
153210
this.clientFactory = () -> socketModeClient;
154211
this.app = app;
212+
this.executorService = null;
155213
}
156214

157-
// -------------------------------------------
158-
159215
public void start() throws Exception {
160216
run(true);
161217
}
@@ -192,6 +248,16 @@ public void stop() throws Exception {
192248
public void close() throws Exception {
193249
this.stop();
194250
this.client = null;
251+
if (executorService != null) {
252+
for (Runnable runnable : executorService.shutdownNow()) {
253+
try {
254+
runnable.run();
255+
} catch (Exception e) {
256+
log.warn("Failed to run the remaining Runnable in SocketModeApp (error: {}, message: {})",
257+
e.getClass().getCanonicalName(), e.getMessage());
258+
}
259+
}
260+
}
195261
}
196262

197263
// -------------------------------------------
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package samples;
2+
3+
import com.slack.api.bolt.App;
4+
import com.slack.api.bolt.AppConfig;
5+
import com.slack.api.bolt.jakarta_socket_mode.SocketModeApp;
6+
import com.slack.api.model.event.MessageChangedEvent;
7+
import com.slack.api.model.event.MessageDeletedEvent;
8+
import com.slack.api.model.event.MessageEvent;
9+
import config.Constants;
10+
11+
public class ConcurrencyTestApp {
12+
13+
public static void main(String[] args) throws Exception {
14+
App app = new App(AppConfig.builder()
15+
.singleTeamBotToken(System.getenv(Constants.SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN))
16+
.build());
17+
18+
app.event(MessageEvent.class, (req, ctx) -> {
19+
// Without concurrency option, this time-consuming task slows the whole message processing mechanism down
20+
try {
21+
Thread.sleep(1000L);
22+
} catch (InterruptedException e) {
23+
throw new RuntimeException(e);
24+
}
25+
ctx.asyncClient().reactionsAdd(r -> r
26+
.channel(req.getEvent().getChannel())
27+
.name("eyes")
28+
.timestamp(req.getEvent().getTs())
29+
);
30+
return ctx.ack();
31+
});
32+
app.event(MessageChangedEvent.class, (req, ctx) -> ctx.ack());
33+
app.event(MessageDeletedEvent.class, (req, ctx) -> ctx.ack());
34+
35+
String appToken = System.getenv(Constants.SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN);
36+
// SocketModeApp socketModeApp = new SocketModeApp(appToken, app);
37+
SocketModeApp socketModeApp = new SocketModeApp(appToken, app, 10);
38+
socketModeApp.start();
39+
}
40+
}

0 commit comments

Comments
 (0)