@@ -44,6 +44,8 @@ public class DefaultEventProcessor implements EventProcessor {
4444
4545 final EventRepository eventRepository = new EventRepository ();
4646
47+ final List <EventAction > actions = new ArrayList <>(EVENT_BATCH_HANDLE_SIZE );
48+
4749 FPContext context ;
4850
4951 private static final String LOG_SENDER_ERROR = "Unexpected error from event sender" ;
@@ -97,7 +99,6 @@ public void shutdown() {
9799
98100 private void handleEvent (FPContext context , BlockingQueue <EventAction > eventQueue ,
99101 EventRepository eventRepository ) {
100- List <EventAction > actions = new ArrayList <>(EVENT_BATCH_HANDLE_SIZE );
101102 while (!closed .get () || !eventQueue .isEmpty ()) {
102103 try {
103104 actions .clear ();
@@ -125,10 +126,10 @@ private void handleEvent(FPContext context, BlockingQueue<EventAction> eventQueu
125126 }
126127
127128 private void doShutdown () {
129+ flush ();
128130 if (closed .compareAndSet (false , true )) {
129-
130131 try {
131- processFlush ( context , eventRepository );
132+ waitTaskDone ( 2 , TimeUnit . SECONDS );
132133 scheduler .awaitTermination (1000 , TimeUnit .MILLISECONDS );
133134 executor .awaitTermination (2000 , TimeUnit .MILLISECONDS );
134135 } catch (InterruptedException e ) {
@@ -138,6 +139,17 @@ private void doShutdown() {
138139 }
139140 }
140141
142+ private void waitTaskDone (long timeout , TimeUnit unit ) {
143+ long startNanos = System .nanoTime ();
144+ while (!eventQueue .isEmpty () && !actions .isEmpty ()) {
145+ long nanos = unit .toNanos (timeout );
146+ if (System .nanoTime () - startNanos > nanos ) {
147+ break ;
148+ }
149+ }
150+ }
151+
152+
141153 private void processEvent (Event event , EventRepository eventRepository ) {
142154 eventRepository .add (event );
143155 }
0 commit comments