Skip to content

Commit 07b0e57

Browse files
authored
Fix race conditions by adding atomicity (#877)
1 parent 58181ce commit 07b0e57

File tree

2 files changed

+13
-24
lines changed

2 files changed

+13
-24
lines changed

slack-api-client/src/main/java/com/slack/api/methods/impl/AsyncRateLimitQueue.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,7 @@ public class AsyncRateLimitQueue extends RateLimitQueue<
2121
private static final ConcurrentMap<String, ConcurrentMap<String, AsyncRateLimitQueue>> ALL_QUEUES = new ConcurrentHashMap<>();
2222

2323
private static ConcurrentMap<String, AsyncRateLimitQueue> getInstance(String executorName) {
24-
ConcurrentMap<String, AsyncRateLimitQueue> teamIdToQueue = ALL_QUEUES.get(executorName);
25-
if (teamIdToQueue == null) {
26-
teamIdToQueue = new ConcurrentHashMap<>();
27-
ALL_QUEUES.put(executorName, teamIdToQueue);
28-
}
29-
return teamIdToQueue;
24+
return ALL_QUEUES.computeIfAbsent(executorName, key -> new ConcurrentHashMap<>());
3025
}
3126

3227
private AsyncMethodsRateLimiter rateLimiter; // intentionally mutable
@@ -51,17 +46,18 @@ public static AsyncRateLimitQueue getOrCreate(MethodsConfig config, String teamI
5146
if (teamId == null) {
5247
throw new IllegalArgumentException("`teamId` is required");
5348
}
49+
5450
ConcurrentMap<String, AsyncRateLimitQueue> teamIdToQueue = getInstance(config.getExecutorName());
55-
AsyncRateLimitQueue queue = teamIdToQueue.get(teamId);
56-
if (queue != null && queue.getRateLimiter().getMetricsDatastore() != config.getMetricsDatastore()) {
57-
// As the metrics datastore has been changed, we should replace the executor
58-
queue.setRateLimiter(new AsyncMethodsRateLimiter(config));
59-
}
60-
if (queue == null) {
61-
queue = new AsyncRateLimitQueue(config);
62-
teamIdToQueue.put(teamId, queue);
63-
}
64-
return queue;
51+
52+
teamIdToQueue.computeIfPresent(teamId, (key, value) -> {
53+
if (value.getRateLimiter().getMetricsDatastore() != config.getMetricsDatastore()) {
54+
value.setRateLimiter(new AsyncMethodsRateLimiter(config));
55+
}
56+
57+
return value;
58+
});
59+
60+
return teamIdToQueue.computeIfAbsent(teamId, key -> new AsyncRateLimitQueue(config));
6561
}
6662

6763
@Data

slack-api-client/src/main/java/com/slack/api/rate_limits/queue/RateLimitQueue.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,7 @@ public abstract class RateLimitQueue<SUPPLIER, MSG extends QueueMessage> {
1818
protected final ConcurrentMap<String, LinkedBlockingQueue<MSG>> methodNameToActiveQueue = new ConcurrentHashMap<>();
1919

2020
protected LinkedBlockingQueue<MSG> getOrCreateActiveQueue(String methodName) {
21-
LinkedBlockingQueue<MSG> queue = methodNameToActiveQueue.get(methodName);
22-
if (queue != null) {
23-
return queue;
24-
} else {
25-
LinkedBlockingQueue<MSG> newQueue = new LinkedBlockingQueue<>();
26-
methodNameToActiveQueue.putIfAbsent(methodName, newQueue);
27-
return newQueue;
28-
}
21+
return methodNameToActiveQueue.computeIfAbsent(methodName, key -> new LinkedBlockingQueue<>());
2922
}
3023

3124
public synchronized SUPPLIER dequeueIfReady(

0 commit comments

Comments
 (0)