1616import org .slf4j .Logger ;
1717import org .slf4j .LoggerFactory ;
1818
19+ /**
20+ * Abstract base class for event queues that manage task event streaming.
21+ * <p>
22+ * An EventQueue provides a thread-safe mechanism for enqueueing and dequeueing events
23+ * related to task execution. It supports backpressure through semaphore-based throttling
24+ * and hierarchical queue structures via MainQueue and ChildQueue implementations.
25+ * </p>
26+ * <p>
27+ * Use {@link #builder()} to create configured instances or extend MainQueue/ChildQueue directly.
28+ * </p>
29+ */
1930public abstract class EventQueue implements AutoCloseable {
2031
2132 private static final Logger LOGGER = LoggerFactory .getLogger (EventQueue .class );
2233
34+ /**
35+ * Default maximum queue size for event queues.
36+ */
2337 public static final int DEFAULT_QUEUE_SIZE = 1000 ;
2438
2539 private final int queueSize ;
40+ /**
41+ * Internal blocking queue for storing event queue items.
42+ */
2643 protected final BlockingQueue <EventQueueItem > queue = new LinkedBlockingDeque <>();
44+ /**
45+ * Semaphore for backpressure control, limiting the number of pending events.
46+ */
2747 protected final Semaphore semaphore ;
2848 private volatile boolean closed = false ;
2949
50+ /**
51+ * Creates an EventQueue with the default queue size.
52+ */
3053 protected EventQueue () {
3154 this (DEFAULT_QUEUE_SIZE );
3255 }
3356
57+ /**
58+ * Creates an EventQueue with the specified queue size.
59+ *
60+ * @param queueSize the maximum number of events that can be queued
61+ * @throws IllegalArgumentException if queueSize is less than or equal to 0
62+ */
3463 protected EventQueue (int queueSize ) {
3564 if (queueSize <= 0 ) {
3665 throw new IllegalArgumentException ("Queue size must be greater than 0" );
@@ -40,6 +69,11 @@ protected EventQueue(int queueSize) {
4069 LOGGER .trace ("Creating {} with queue size: {}" , this , queueSize );
4170 }
4271
72+ /**
73+ * Creates an EventQueue as a child of the specified parent queue.
74+ *
75+ * @param parent the parent event queue
76+ */
4377 protected EventQueue (EventQueue parent ) {
4478 this (DEFAULT_QUEUE_SIZE );
4579 LOGGER .trace ("Creating {}, parent: {}" , this , parent );
@@ -49,40 +83,82 @@ static EventQueueBuilder builder() {
4983 return new EventQueueBuilder ();
5084 }
5185
86+ /**
87+ * Builder for creating configured EventQueue instances.
88+ * <p>
89+ * Supports configuration of queue size, enqueue hooks, task association,
90+ * close callbacks, and task state providers.
91+ * </p>
92+ */
5293 public static class EventQueueBuilder {
5394 private int queueSize = DEFAULT_QUEUE_SIZE ;
5495 private @ Nullable EventEnqueueHook hook ;
5596 private @ Nullable String taskId ;
5697 private List <Runnable > onCloseCallbacks = new java .util .ArrayList <>();
5798 private @ Nullable TaskStateProvider taskStateProvider ;
5899
100+ /**
101+ * Sets the maximum queue size.
102+ *
103+ * @param queueSize the maximum number of events that can be queued
104+ * @return this builder
105+ */
59106 public EventQueueBuilder queueSize (int queueSize ) {
60107 this .queueSize = queueSize ;
61108 return this ;
62109 }
63110
111+ /**
112+ * Sets the enqueue hook for event replication or logging.
113+ *
114+ * @param hook the hook to be invoked when items are enqueued
115+ * @return this builder
116+ */
64117 public EventQueueBuilder hook (EventEnqueueHook hook ) {
65118 this .hook = hook ;
66119 return this ;
67120 }
68121
122+ /**
123+ * Associates this queue with a specific task ID.
124+ *
125+ * @param taskId the task identifier
126+ * @return this builder
127+ */
69128 public EventQueueBuilder taskId (String taskId ) {
70129 this .taskId = taskId ;
71130 return this ;
72131 }
73132
133+ /**
134+ * Adds a callback to be executed when the queue is closed.
135+ *
136+ * @param onCloseCallback the callback to execute on close
137+ * @return this builder
138+ */
74139 public EventQueueBuilder addOnCloseCallback (Runnable onCloseCallback ) {
75140 if (onCloseCallback != null ) {
76141 this .onCloseCallbacks .add (onCloseCallback );
77142 }
78143 return this ;
79144 }
80145
146+ /**
147+ * Sets the task state provider for tracking task finalization.
148+ *
149+ * @param taskStateProvider the task state provider
150+ * @return this builder
151+ */
81152 public EventQueueBuilder taskStateProvider (TaskStateProvider taskStateProvider ) {
82153 this .taskStateProvider = taskStateProvider ;
83154 return this ;
84155 }
85156
157+ /**
158+ * Builds and returns the configured EventQueue.
159+ *
160+ * @return a new MainQueue instance
161+ */
86162 public EventQueue build () {
87163 if (hook != null || !onCloseCallbacks .isEmpty () || taskStateProvider != null ) {
88164 return new MainQueue (queueSize , hook , taskId , onCloseCallbacks , taskStateProvider );
@@ -92,18 +168,48 @@ public EventQueue build() {
92168 }
93169 }
94170
171+ /**
172+ * Returns the configured queue size.
173+ *
174+ * @return the maximum number of events that can be queued
175+ */
95176 public int getQueueSize () {
96177 return queueSize ;
97178 }
98179
180+ /**
181+ * Waits for the queue poller to start consuming events.
182+ * This method blocks until signaled by {@link #signalQueuePollerStarted()}.
183+ *
184+ * @throws InterruptedException if the thread is interrupted while waiting
185+ */
99186 public abstract void awaitQueuePollerStart () throws InterruptedException ;
100187
188+ /**
189+ * Signals that the queue poller has started consuming events.
190+ * This unblocks any threads waiting in {@link #awaitQueuePollerStart()}.
191+ */
101192 public abstract void signalQueuePollerStarted ();
102193
194+ /**
195+ * Enqueues an event for processing.
196+ *
197+ * @param event the event to enqueue
198+ */
103199 public void enqueueEvent (Event event ) {
104200 enqueueItem (new LocalEventQueueItem (event ));
105201 }
106202
203+ /**
204+ * Enqueues an event queue item for processing.
205+ * <p>
206+ * This method will block if the queue is full, waiting to acquire a semaphore permit.
207+ * If the queue is closed, the event will not be enqueued and a warning will be logged.
208+ * </p>
209+ *
210+ * @param item the event queue item to enqueue
211+ * @throws RuntimeException if interrupted while waiting to acquire the semaphore
212+ */
107213 public void enqueueItem (EventQueueItem item ) {
108214 Event event = item .getEvent ();
109215 if (closed ) {
@@ -121,6 +227,16 @@ public void enqueueItem(EventQueueItem item) {
121227 LOGGER .debug ("Enqueued event {} {}" , event instanceof Throwable ? event .toString () : event , this );
122228 }
123229
230+ /**
231+ * Creates a child queue that shares events with this queue.
232+ * <p>
233+ * For MainQueue: creates a ChildQueue that receives all events enqueued to the parent.
234+ * For ChildQueue: throws IllegalStateException (only MainQueue can be tapped).
235+ * </p>
236+ *
237+ * @return a new ChildQueue instance
238+ * @throws IllegalStateException if called on a ChildQueue
239+ */
124240 public abstract EventQueue tap ();
125241
126242 /**
@@ -172,12 +288,24 @@ public void enqueueItem(EventQueueItem item) {
172288 }
173289 }
174290
291+ /**
292+ * Placeholder method for task completion notification.
293+ * Currently not used as BlockingQueue.poll()/take() automatically remove events.
294+ */
175295 public void taskDone () {
176296 // TODO Not sure if needed yet. BlockingQueue.poll()/.take() remove the events.
177297 }
178298
299+ /**
300+ * Closes this event queue gracefully, allowing pending events to be consumed.
301+ */
179302 public abstract void close ();
180303
304+ /**
305+ * Closes this event queue with control over immediate shutdown.
306+ *
307+ * @param immediate if true, clears all pending events immediately; if false, allows graceful drain
308+ */
181309 public abstract void close (boolean immediate );
182310
183311 /**
@@ -191,14 +319,28 @@ public void taskDone() {
191319 */
192320 public abstract void close (boolean immediate , boolean notifyParent );
193321
322+ /**
323+ * Checks if this queue has been closed.
324+ *
325+ * @return true if the queue is closed, false otherwise
326+ */
194327 public boolean isClosed () {
195328 return closed ;
196329 }
197330
331+ /**
332+ * Internal method to close the queue gracefully.
333+ * Delegates to {@link #doClose(boolean)} with immediate=false.
334+ */
198335 protected void doClose () {
199336 doClose (false );
200337 }
201338
339+ /**
340+ * Internal method to close the queue with control over immediate shutdown.
341+ *
342+ * @param immediate if true, clears all pending events immediately; if false, allows graceful drain
343+ */
202344 protected void doClose (boolean immediate ) {
203345 synchronized (this ) {
204346 if (closed ) {
0 commit comments