@@ -21,7 +21,6 @@ public abstract class EventQueue implements AutoCloseable {
2121 public static final int DEFAULT_QUEUE_SIZE = 1000 ;
2222
2323 private final int queueSize ;
24- protected final BlockingQueue <EventQueueItem > queue = new LinkedBlockingDeque <>();
2524 protected final Semaphore semaphore ;
2625 private volatile boolean closed = false ;
2726
@@ -43,8 +42,8 @@ protected EventQueue(EventQueue parent) {
4342 LOGGER .trace ("Creating {}, parent: {}" , this , parent );
4443 }
4544
46- static EventQueueBuilder builder () {
47- return new EventQueueBuilder ();
45+ static EventQueueBuilder builder (MainEventBus mainEventBus ) {
46+ return new EventQueueBuilder (). mainEventBus ( mainEventBus ) ;
4847 }
4948
5049 public static class EventQueueBuilder {
@@ -88,11 +87,11 @@ public EventQueueBuilder mainEventBus(MainEventBus mainEventBus) {
8887 }
8988
9089 public EventQueue build () {
91- if (hook != null || !onCloseCallbacks .isEmpty () || taskStateProvider != null || mainEventBus != null ) {
92- return new MainQueue (queueSize , hook , taskId , onCloseCallbacks , taskStateProvider , mainEventBus );
93- } else {
94- return new MainQueue (queueSize );
90+ // MainEventBus is now REQUIRED - enforce single architectural path
91+ if (mainEventBus == null ) {
92+ throw new IllegalStateException ("MainEventBus is required for EventQueue creation" );
9593 }
94+ return new MainQueue (queueSize , hook , taskId , onCloseCallbacks , taskStateProvider , mainEventBus );
9695 }
9796 }
9897
@@ -108,22 +107,7 @@ public void enqueueEvent(Event event) {
108107 enqueueItem (new LocalEventQueueItem (event ));
109108 }
110109
111- public void enqueueItem (EventQueueItem item ) {
112- Event event = item .getEvent ();
113- if (closed ) {
114- LOGGER .warn ("Queue is closed. Event will not be enqueued. {} {}" , this , event );
115- return ;
116- }
117- // Call toString() since for errors we don't really want the full stacktrace
118- try {
119- semaphore .acquire ();
120- } catch (InterruptedException e ) {
121- Thread .currentThread ().interrupt ();
122- throw new RuntimeException ("Unable to acquire the semaphore to enqueue the event" , e );
123- }
124- queue .add (item );
125- LOGGER .debug ("Enqueued event {} {}" , event instanceof Throwable ? event .toString () : event , this );
126- }
110+ public abstract void enqueueItem (EventQueueItem item );
127111
128112 public abstract EventQueue tap ();
129113
@@ -133,53 +117,32 @@ public void enqueueItem(EventQueueItem item) {
133117 * This method returns the full EventQueueItem wrapper, allowing callers to check
134118 * metadata like whether the event is replicated via {@link EventQueueItem#isReplicated()}.
135119 * </p>
120+ * <p>
121+ * Note: MainQueue does not support dequeue operations - only ChildQueues can be consumed.
122+ * </p>
136123 *
137124 * @param waitMilliSeconds the maximum time to wait in milliseconds
138125 * @return the EventQueueItem, or null if timeout occurs
139126 * @throws EventQueueClosedException if the queue is closed and empty
127+ * @throws UnsupportedOperationException if called on MainQueue
140128 */
141- public EventQueueItem dequeueEventItem (int waitMilliSeconds ) throws EventQueueClosedException {
142- if (closed && queue .isEmpty ()) {
143- LOGGER .debug ("Queue is closed, and empty. Sending termination message. {}" , this );
144- throw new EventQueueClosedException ();
145- }
146- try {
147- if (waitMilliSeconds <= 0 ) {
148- EventQueueItem item = queue .poll ();
149- if (item != null ) {
150- Event event = item .getEvent ();
151- // Call toString() since for errors we don't really want the full stacktrace
152- LOGGER .debug ("Dequeued event item (no wait) {} {}" , this , event instanceof Throwable ? event .toString () : event );
153- semaphore .release ();
154- }
155- return item ;
156- }
157- try {
158- LOGGER .trace ("Polling queue {} (wait={}ms)" , System .identityHashCode (this ), waitMilliSeconds );
159- EventQueueItem item = queue .poll (waitMilliSeconds , TimeUnit .MILLISECONDS );
160- if (item != null ) {
161- Event event = item .getEvent ();
162- // Call toString() since for errors we don't really want the full stacktrace
163- LOGGER .debug ("Dequeued event item (waiting) {} {}" , this , event instanceof Throwable ? event .toString () : event );
164- semaphore .release ();
165- } else {
166- LOGGER .trace ("Dequeue timeout (null) from queue {}" , System .identityHashCode (this ));
167- }
168- return item ;
169- } catch (InterruptedException e ) {
170- LOGGER .debug ("Interrupted dequeue (waiting) {}" , this );
171- Thread .currentThread ().interrupt ();
172- return null ;
173- }
174- } finally {
175- signalQueuePollerStarted ();
176- }
177- }
129+ public abstract EventQueueItem dequeueEventItem (int waitMilliSeconds ) throws EventQueueClosedException ;
178130
179131 public void taskDone () {
180132 // TODO Not sure if needed yet. BlockingQueue.poll()/.take() remove the events.
181133 }
182134
135+ /**
136+ * Returns the current size of the queue.
137+ * <p>
138+ * For MainQueue: returns the size of the MainEventBus queue (events pending persistence/distribution).
139+ * For ChildQueue: returns the size of the local consumption queue.
140+ * </p>
141+ *
142+ * @return the number of events currently in the queue
143+ */
144+ public abstract int size ();
145+
183146 public abstract void close ();
184147
185148 public abstract void close (boolean immediate );
@@ -211,13 +174,7 @@ protected void doClose(boolean immediate) {
211174 LOGGER .debug ("Closing {} (immediate={})" , this , immediate );
212175 closed = true ;
213176 }
214-
215- if (immediate ) {
216- // Immediate close: clear pending events
217- queue .clear ();
218- LOGGER .debug ("Cleared queue for immediate close: {}" , this );
219- }
220- // For graceful close, let the queue drain naturally through normal consumption
177+ // Subclasses handle immediate close logic (e.g., ChildQueue clears its local queue)
221178 }
222179
223180 static class MainQueue extends EventQueue {
@@ -230,62 +187,15 @@ static class MainQueue extends EventQueue {
230187 private final TaskStateProvider taskStateProvider ;
231188 private final MainEventBus mainEventBus ;
232189
233- MainQueue () {
234- super ();
235- this .enqueueHook = null ;
236- this .taskId = null ;
237- this .onCloseCallbacks = List .of ();
238- this .taskStateProvider = null ;
239- this .mainEventBus = null ;
240- }
241-
242- MainQueue (int queueSize ) {
243- super (queueSize );
244- this .enqueueHook = null ;
245- this .taskId = null ;
246- this .onCloseCallbacks = List .of ();
247- this .taskStateProvider = null ;
248- this .mainEventBus = null ;
249- }
250-
251- MainQueue (EventEnqueueHook hook ) {
252- super ();
253- this .enqueueHook = hook ;
254- this .taskId = null ;
255- this .onCloseCallbacks = List .of ();
256- this .taskStateProvider = null ;
257- this .mainEventBus = null ;
258- }
259-
260- MainQueue (int queueSize , EventEnqueueHook hook ) {
261- super (queueSize );
262- this .enqueueHook = hook ;
263- this .taskId = null ;
264- this .onCloseCallbacks = List .of ();
265- this .taskStateProvider = null ;
266- this .mainEventBus = null ;
267- }
268-
269- MainQueue (int queueSize , EventEnqueueHook hook , String taskId , List <Runnable > onCloseCallbacks , TaskStateProvider taskStateProvider ) {
270- super (queueSize );
271- this .enqueueHook = hook ;
272- this .taskId = taskId ;
273- this .onCloseCallbacks = List .copyOf (onCloseCallbacks ); // Defensive copy
274- this .taskStateProvider = taskStateProvider ;
275- this .mainEventBus = null ;
276- LOGGER .debug ("Created MainQueue for task {} with {} onClose callbacks and TaskStateProvider: {}" ,
277- taskId , onCloseCallbacks .size (), taskStateProvider != null );
278- }
279-
280190 MainQueue (int queueSize , EventEnqueueHook hook , String taskId , List <Runnable > onCloseCallbacks , TaskStateProvider taskStateProvider , MainEventBus mainEventBus ) {
281191 super (queueSize );
282192 this .enqueueHook = hook ;
283193 this .taskId = taskId ;
284194 this .onCloseCallbacks = List .copyOf (onCloseCallbacks ); // Defensive copy
285195 this .taskStateProvider = taskStateProvider ;
286- this .mainEventBus = mainEventBus ;
287- LOGGER .debug ("Created MainQueue for task {} with {} onClose callbacks, TaskStateProvider: {}, MainEventBus: {} " ,
288- taskId , onCloseCallbacks .size (), taskStateProvider != null , mainEventBus != null );
196+ this .mainEventBus = java . util . Objects . requireNonNull ( mainEventBus , "MainEventBus is required" ) ;
197+ LOGGER .debug ("Created MainQueue for task {} with {} onClose callbacks, TaskStateProvider: {}, MainEventBus configured " ,
198+ taskId , onCloseCallbacks .size (), taskStateProvider != null );
289199 }
290200
291201 public EventQueue tap () {
@@ -302,6 +212,17 @@ public int getChildCount() {
302212 return children .size ();
303213 }
304214
215+ @ Override
216+ public EventQueueItem dequeueEventItem (int waitMilliSeconds ) throws EventQueueClosedException {
217+ throw new UnsupportedOperationException ("MainQueue cannot be consumed directly - use tap() to create a ChildQueue for consumption" );
218+ }
219+
220+ @ Override
221+ public int size () {
222+ // Return size of MainEventBus queue (events pending persistence/distribution)
223+ return mainEventBus .size ();
224+ }
225+
305226 @ Override
306227 public void enqueueItem (EventQueueItem item ) {
307228 // MainQueue must accept events even when closed to support:
@@ -320,19 +241,13 @@ public void enqueueItem(EventQueueItem item) {
320241 throw new RuntimeException ("Unable to acquire the semaphore to enqueue the event" , e );
321242 }
322243
323- // Add to this MainQueue's internal queue
324- queue .add (item );
325244 LOGGER .debug ("Enqueued event {} {}" , event instanceof Throwable ? event .toString () : event , this );
326245
327246 // Submit to MainEventBus for centralized persistence + distribution
328- if (mainEventBus != null && taskId != null ) {
329- mainEventBus .submit (taskId , this , item );
330- } else {
331- // This should not happen in properly configured systems
332- LOGGER .error ("MainEventBus not configured for task {} - events will NOT be distributed to children!" , taskId );
333- }
247+ // MainEventBus is guaranteed non-null by constructor requirement
248+ mainEventBus .submit (taskId , this , item );
334249
335- // Trigger replication hook if configured (KEEP for inter-process replication)
250+ // Trigger replication hook if configured (for inter-process replication)
336251 if (enqueueHook != null ) {
337252 enqueueHook .onEnqueue (item );
338253 }
@@ -459,6 +374,7 @@ public void close(boolean immediate, boolean notifyParent) {
459374
460375 static class ChildQueue extends EventQueue {
461376 private final MainQueue parent ;
377+ private final BlockingQueue <EventQueueItem > queue = new LinkedBlockingDeque <>();
462378
463379 public ChildQueue (MainQueue parent ) {
464380 this .parent = parent ;
@@ -469,15 +385,77 @@ public void enqueueEvent(Event event) {
469385 parent .enqueueEvent (event );
470386 }
471387
388+ @ Override
389+ public void enqueueItem (EventQueueItem item ) {
390+ // ChildQueue delegates writes to parent MainQueue
391+ parent .enqueueItem (item );
392+ }
393+
472394 private void internalEnqueueItem (EventQueueItem item ) {
473- super .enqueueItem (item );
395+ // Internal method called by MainEventBusProcessor to add to local queue
396+ Event event = item .getEvent ();
397+ if (isClosed ()) {
398+ LOGGER .warn ("ChildQueue is closed. Event will not be enqueued. {} {}" , this , event );
399+ return ;
400+ }
401+ try {
402+ semaphore .acquire ();
403+ } catch (InterruptedException e ) {
404+ Thread .currentThread ().interrupt ();
405+ throw new RuntimeException ("Unable to acquire the semaphore to enqueue the event" , e );
406+ }
407+ queue .add (item );
408+ LOGGER .debug ("Enqueued event {} {}" , event instanceof Throwable ? event .toString () : event , this );
409+ }
410+
411+ @ Override
412+ public EventQueueItem dequeueEventItem (int waitMilliSeconds ) throws EventQueueClosedException {
413+ if (isClosed () && queue .isEmpty ()) {
414+ LOGGER .debug ("ChildQueue is closed, and empty. Sending termination message. {}" , this );
415+ throw new EventQueueClosedException ();
416+ }
417+ try {
418+ if (waitMilliSeconds <= 0 ) {
419+ EventQueueItem item = queue .poll ();
420+ if (item != null ) {
421+ Event event = item .getEvent ();
422+ LOGGER .debug ("Dequeued event item (no wait) {} {}" , this , event instanceof Throwable ? event .toString () : event );
423+ semaphore .release ();
424+ }
425+ return item ;
426+ }
427+ try {
428+ LOGGER .trace ("Polling ChildQueue {} (wait={}ms)" , System .identityHashCode (this ), waitMilliSeconds );
429+ EventQueueItem item = queue .poll (waitMilliSeconds , TimeUnit .MILLISECONDS );
430+ if (item != null ) {
431+ Event event = item .getEvent ();
432+ LOGGER .debug ("Dequeued event item (waiting) {} {}" , this , event instanceof Throwable ? event .toString () : event );
433+ semaphore .release ();
434+ } else {
435+ LOGGER .trace ("Dequeue timeout (null) from ChildQueue {}" , System .identityHashCode (this ));
436+ }
437+ return item ;
438+ } catch (InterruptedException e ) {
439+ LOGGER .debug ("Interrupted dequeue (waiting) {}" , this );
440+ Thread .currentThread ().interrupt ();
441+ return null ;
442+ }
443+ } finally {
444+ signalQueuePollerStarted ();
445+ }
474446 }
475447
476448 @ Override
477449 public EventQueue tap () {
478450 throw new IllegalStateException ("Can only tap the main queue" );
479451 }
480452
453+ @ Override
454+ public int size () {
455+ // Return size of local consumption queue
456+ return queue .size ();
457+ }
458+
481459 @ Override
482460 public void awaitQueuePollerStart () throws InterruptedException {
483461 parent .awaitQueuePollerStart ();
@@ -488,6 +466,17 @@ public void signalQueuePollerStarted() {
488466 parent .signalQueuePollerStarted ();
489467 }
490468
469+ @ Override
470+ protected void doClose (boolean immediate ) {
471+ super .doClose (immediate ); // Sets closed flag
472+ if (immediate ) {
473+ // Immediate close: clear pending events from local queue
474+ queue .clear ();
475+ LOGGER .debug ("Cleared ChildQueue for immediate close: {}" , this );
476+ }
477+ // For graceful close, let the queue drain naturally through normal consumption
478+ }
479+
491480 @ Override
492481 public void close () {
493482 close (false );
0 commit comments