Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public abstract class EnhancedRunnable implements Runnable {
private volatile Throwable error;
private final List<DoneCallback> doneCallbacks = new ArrayList<>();
private final List<DoneCallback> doneCallbacks = new CopyOnWriteArrayList<>();

public Throwable getError() {
return error;
Expand All @@ -16,16 +17,12 @@ public void setError(Throwable error) {
}

public void addDoneCallback(DoneCallback doneCallback) {
synchronized (doneCallbacks) {
doneCallbacks.add(doneCallback);
}
doneCallbacks.add(doneCallback);
}

public void invokeDoneCallbacks() {
synchronized (doneCallbacks) {
for (DoneCallback doneCallback : doneCallbacks) {
doneCallback.done(this);
}
for (DoneCallback doneCallback : doneCallbacks) {
doneCallback.done(this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -13,13 +15,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class EventQueue {
public abstract class EventQueue implements AutoCloseable {

private static final Logger log = new TempLoggerWrapper(LoggerFactory.getLogger(EventQueue.class));

// TODO decide on a capacity
private static final int queueSize = 1000;

private final EventQueue parent;
// TODO decide on a capacity (or more appropriate queue data structures)
private final BlockingQueue<Event> queue = new ArrayBlockingQueue<Event>(1000);

private final BlockingQueue<Event> queue = new LinkedBlockingDeque<>();
private final Semaphore semaphore = new Semaphore(queueSize, true);
private volatile boolean closed = false;


Expand Down Expand Up @@ -47,6 +53,11 @@ public void enqueueEvent(Event event) {
return;
}
// Call toString() since for errors we don't really want the full stacktrace
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException("Unable to acquire the semaphore to enqueue the event", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When catching an InterruptedException, it is a best practice to restore the interrupted status of the thread by calling Thread.currentThread().interrupt()1. This allows code higher up the call stack to be aware that an interruption occurred and handle it appropriately. Simply wrapping it in a RuntimeException loses this important information.

Thread.currentThread().interrupt();
            throw new RuntimeException("Unable to acquire the semaphore to enqueue the event", e);

Style Guide References

Footnotes

  1. Restore the interrupted status (link)

}
queue.add(event);
log.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this);
}
Expand All @@ -65,6 +76,7 @@ public Event dequeueEvent(int waitMilliSeconds) throws EventQueueClosedException
// Call toString() since for errors we don't really want the full stacktrace
log.debug("Dequeued event (no wait) {} {}", this, event instanceof Throwable ? event.toString() : event);
}
semaphore.release();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The semaphore is released even if queue.poll() returns null (i.e., no event was dequeued). This will cause the semaphore's permit count to increase over time, effectively breaking the intended capacity limit of the queue. This could lead to unbounded memory usage and an OutOfMemoryError. The semaphore.release() call should only be made when an event has been successfully dequeued.

if (event != null) {
                    semaphore.release();
                }

return event;
}
try {
Expand All @@ -73,6 +85,7 @@ public Event dequeueEvent(int waitMilliSeconds) throws EventQueueClosedException
// Call toString() since for errors we don't really want the full stacktrace
log.debug("Dequeued event (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event);
}
semaphore.release();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The semaphore is released even if queue.poll(...) returns null because of a timeout. This will incorrectly increment the semaphore's permit count, defeating the queue's capacity limit and risking an OutOfMemoryError. The semaphore.release() call must be conditional on an event actually being dequeued.

if (event != null) {
                    semaphore.release();
                }

return event;
} catch (InterruptedException e) {
log.debug("Interrupted dequeue (waiting) {}", this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class InMemoryQueueManager implements QueueManager {
private final Map<String, EventQueue> queues = Collections.synchronizedMap(new HashMap<>());
private final ConcurrentMap<String, EventQueue> queues = new ConcurrentHashMap<>();

@Override
public void add(String taskId, EventQueue queue) {
synchronized (queues) {
if (queues.containsKey(taskId)) {
throw new TaskQueueExistsException();
}
queues.put(taskId, queue);
EventQueue existing = queues.putIfAbsent(taskId, queue);
if (existing != null) {
throw new TaskQueueExistsException();
}
}

Expand All @@ -27,36 +27,29 @@ public EventQueue get(String taskId) {

@Override
public EventQueue tap(String taskId) {
synchronized (taskId) {
EventQueue queue = queues.get(taskId);
if (queue == null) {
return queue;
}
return queue.tap();
}
EventQueue queue = queues.get(taskId);
return queue == null ? null : queue.tap();
}

@Override
public void close(String taskId) {
synchronized (queues) {
EventQueue existing = queues.remove(taskId);
if (existing == null) {
throw new NoTaskQueueException();
}
EventQueue existing = queues.remove(taskId);
if (existing == null) {
throw new NoTaskQueueException();
}
}

@Override
public EventQueue createOrTap(String taskId) {
synchronized (queues) {
EventQueue queue = queues.get(taskId);
if (queue != null) {
return queue.tap();
}
queue = EventQueue.create();
queues.put(taskId, queue);
return queue;

EventQueue existing = queues.get(taskId);
EventQueue newQueue = null;
if (existing == null) {
newQueue = EventQueue.create();
// Make sure an existing queue has not been added in the meantime
existing = queues.putIfAbsent(taskId, newQueue);
}
return existing == null ? newQueue : existing.tap();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -61,7 +63,7 @@ public class DefaultRequestHandler implements RequestHandler {
private final PushNotifier pushNotifier;
private final Supplier<RequestContext.Builder> requestContextBuilder;

private final Map<String, CompletableFuture<Void>> runningAgents = Collections.synchronizedMap(new HashMap<>());
private final ConcurrentMap<String, CompletableFuture<Void>> runningAgents = new ConcurrentHashMap<>();

private final Executor executor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import jakarta.enterprise.context.ApplicationScoped;

Expand All @@ -11,7 +13,7 @@
@ApplicationScoped
public class InMemoryTaskStore implements TaskStore {

private final Map<String, Task> tasks = Collections.synchronizedMap(new HashMap<>());
private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<>();

@Override
public void save(Task task) {
Expand Down