@@ -37,17 +37,6 @@ public abstract class EventQueue implements AutoCloseable {
3737 public static final int DEFAULT_QUEUE_SIZE = 1000 ;
3838
3939 private final int queueSize ;
40-
41- /**
42- * Internal blocking queue for storing event queue items.
43- */
44- protected final BlockingQueue <EventQueueItem > queue = new LinkedBlockingDeque <>();
45-
46- /**
47- * Semaphore for backpressure control, limiting the number of pending events.
48- */
49- protected final Semaphore semaphore ;
50-
5140 private volatile boolean closed = false ;
5241
5342 /**
@@ -68,7 +57,6 @@ protected EventQueue(int queueSize) {
6857 throw new IllegalArgumentException ("Queue size must be greater than 0" );
6958 }
7059 this .queueSize = queueSize ;
71- this .semaphore = new Semaphore (queueSize , true );
7260 LOGGER .trace ("Creating {} with queue size: {}" , this , queueSize );
7361 }
7462
@@ -337,6 +325,7 @@ protected void doClose(boolean immediate) {
337325
338326 static class MainQueue extends EventQueue {
339327 private final List <ChildQueue > children = new CopyOnWriteArrayList <>();
328+ protected final Semaphore semaphore ;
340329 private final CountDownLatch pollingStartedLatch = new CountDownLatch (1 );
341330 private final AtomicBoolean pollingStarted = new AtomicBoolean (false );
342331 private final @ Nullable EventEnqueueHook enqueueHook ;
@@ -352,6 +341,7 @@ static class MainQueue extends EventQueue {
352341 @ Nullable TaskStateProvider taskStateProvider ,
353342 @ Nullable MainEventBus mainEventBus ) {
354343 super (queueSize );
344+ this .semaphore = new Semaphore (queueSize , true );
355345 this .enqueueHook = hook ;
356346 this .taskId = taskId ;
357347 this .onCloseCallbacks = List .copyOf (onCloseCallbacks ); // Defensive copy
@@ -570,8 +560,12 @@ private void internalEnqueueItem(EventQueueItem item) {
570560 LOGGER .warn ("ChildQueue is closed. Event will not be enqueued. {} {}" , this , event );
571561 return ;
572562 }
573- queue .add (item );
574- LOGGER .debug ("Enqueued event {} {}" , event instanceof Throwable ? event .toString () : event , this );
563+ if (!queue .offer (item )) {
564+ LOGGER .warn ("ChildQueue {} is full. Closing immediately." , this );
565+ close (true ); // immediate close
566+ } else {
567+ LOGGER .debug ("Enqueued event {} {}" , event instanceof Throwable ? event .toString () : event , this );
568+ }
575569 }
576570
577571 @ Override
@@ -587,7 +581,6 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl
587581 if (item != null ) {
588582 Event event = item .getEvent ();
589583 LOGGER .debug ("Dequeued event item (no wait) {} {}" , this , event instanceof Throwable ? event .toString () : event );
590- // Note: Semaphore is managed by parent MainQueue, not released here
591584 }
592585 return item ;
593586 }
@@ -597,7 +590,6 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl
597590 if (item != null ) {
598591 Event event = item .getEvent ();
599592 LOGGER .debug ("Dequeued event item (waiting) {} {}" , this , event instanceof Throwable ? event .toString () : event );
600- // Note: Semaphore is managed by parent MainQueue, not released here
601593 } else {
602594 LOGGER .trace ("Dequeue timeout (null) from ChildQueue {}" , System .identityHashCode (this ));
603595 }
@@ -640,8 +632,6 @@ protected void doClose(boolean immediate) {
640632 // Immediate close: clear pending events from local queue
641633 int clearedCount = queue .size ();
642634 queue .clear ();
643- // Release semaphore permits for cleared events to prevent deadlock
644- semaphore .release (clearedCount );
645635 LOGGER .debug ("Cleared {} events from ChildQueue for immediate close: {}" , clearedCount , this );
646636 }
647637 // For graceful close, let the queue drain naturally through normal consumption
0 commit comments