2323 * THE SOFTWARE.
2424 */
2525package com .iluwatar .logaggregation ;
26-
26+ import java . util . concurrent . BlockingQueue ;
2727import java .util .concurrent .ConcurrentLinkedQueue ;
28- import java .util .concurrent .ExecutorService ;
28+ import java .util .concurrent .LinkedBlockingQueue ;
29+ import java .util .concurrent .ScheduledExecutorService ;
2930import java .util .concurrent .Executors ;
3031import java .util .concurrent .TimeUnit ;
3132import java .util .concurrent .atomic .AtomicInteger ;
33+ import java .util .concurrent .CountDownLatch ;
34+ import java .util .ArrayList ;
35+ import java .util .List ;
3236import lombok .extern .slf4j .Slf4j ;
3337
3438/**
4145public class LogAggregator {
4246
4347 private static final int BUFFER_THRESHOLD = 3 ;
48+ private static final int FLUSH_INTERVAL_SECONDS = 5 ;
49+ private static final int SHUTDOWN_TIMEOUT_SECONDS = 10 ;
50+
4451 private final CentralLogStore centralLogStore ;
4552 private final ConcurrentLinkedQueue <LogEntry > buffer = new ConcurrentLinkedQueue <>();
4653 private final LogLevel minLogLevel ;
4754 private final ExecutorService executorService = Executors .newSingleThreadExecutor ();
4855 private final AtomicInteger logCount = new AtomicInteger (0 );
56+ private final ScheduledExecutorService scheduledExecutor = Executors .newScheduledThreadPool (1 );
57+ private final CountDownLatch shutdownLatch = new CountDownLatch (1 );
58+ private volatile boolean running = true ;
4959
5060 /**
5161 * constructor of LogAggregator.
@@ -57,6 +67,15 @@ public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
5767 this .centralLogStore = centralLogStore ;
5868 this .minLogLevel = minLogLevel ;
5969 startBufferFlusher ();
70+ // Add shutdown hook for graceful termination
71+ Runtime .getRuntime ().addShutdownHook (new Thread (() -> {
72+ try {
73+ stop ();
74+ } catch (InterruptedException e ) {
75+ LOGGER .warn ("Shutdown interrupted" , e );
76+ Thread .currentThread ().interrupt ();
77+ }
78+ }));
6079 }
6180
6281 /**
@@ -65,6 +84,11 @@ public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
6584 * @param logEntry The log entry to collect.
6685 */
6786 public void collectLog (LogEntry logEntry ) {
87+ if (!running ) {
88+ LOGGER .warn ("LogAggregator is shutting down. Skipping log entry." );
89+ return ;
90+ }
91+
6892 if (logEntry .getLevel () == null || minLogLevel == null ) {
6993 LOGGER .warn ("Log level or threshold level is null. Skipping." );
7094 return ;
@@ -75,10 +99,17 @@ public void collectLog(LogEntry logEntry) {
7599 return ;
76100 }
77101
78- buffer .offer (logEntry );
102+ // BlockingQueue.offer() is non-blocking and thread-safe
103+ boolean added = buffer .offer (logEntry );
104+ if (!added ) {
105+ LOGGER .warn ("Failed to add log entry to buffer - queue may be full" );
106+ return ;
107+ }
79108
109+ // Check if immediate flush is needed due to threshold
80110 if (logCount .incrementAndGet () >= BUFFER_THRESHOLD ) {
81- flushBuffer ();
111+ // Schedule immediate flush instead of blocking current thread
112+ scheduledExecutor .execute (this ::flushBuffer );
82113 }
83114 }
84115
@@ -88,32 +119,123 @@ public void collectLog(LogEntry logEntry) {
88119 * @throws InterruptedException If any thread has interrupted the current thread.
89120 */
90121 public void stop () throws InterruptedException {
91- executorService .shutdownNow ();
92- if (!executorService .awaitTermination (10 , TimeUnit .SECONDS )) {
93- LOGGER .error ("Log aggregator did not terminate." );
122+ LOGGER .info ("Stopping LogAggregator..." );
123+ running = false ;
124+
125+ // Shutdown the scheduler gracefully
126+ scheduledExecutor .shutdown ();
127+
128+ try {
129+ // Wait for scheduled tasks to complete
130+ if (!scheduledExecutor .awaitTermination (SHUTDOWN_TIMEOUT_SECONDS , TimeUnit .SECONDS )) {
131+ LOGGER .warn ("Scheduler did not terminate gracefully, forcing shutdown" );
132+ scheduledExecutor .shutdownNow ();
133+
134+ // Wait a bit more for tasks to respond to interruption
135+ if (!scheduledExecutor .awaitTermination (2 , TimeUnit .SECONDS )) {
136+ LOGGER .error ("Scheduler did not terminate after forced shutdown" );
137+ }
138+ }
139+ } finally {
140+ // Final flush of any remaining logs
141+ flushBuffer ();
142+ shutdownLatch .countDown ();
143+ LOGGER .info ("LogAggregator stopped successfully" );
94144 }
95- flushBuffer ();
96145 }
97146
147+
148+ /**
149+ * Waits for the LogAggregator to complete shutdown.
150+ * Useful for testing or controlled shutdown scenarios.
151+ *
152+ * @throws InterruptedException If any thread has interrupted the current thread.
153+ */
154+ public void awaitShutdown () throws InterruptedException {
155+ shutdownLatch .await ();
156+ }
157+
158+
98159 private void flushBuffer () {
99- LogEntry logEntry ;
100- while ((logEntry = buffer .poll ()) != null ) {
101- centralLogStore .storeLog (logEntry );
102- logCount .decrementAndGet ();
160+ if (!running && buffer .isEmpty ()) {
161+ return ;
162+ }
163+
164+ try {
165+ List <LogEntry > batch = new ArrayList <>();
166+ int drained = 0 ;
167+
168+ // Drain up to a reasonable batch size for efficiency
169+ LogEntry logEntry ;
170+ while ((logEntry = buffer .poll ()) != null && drained < 100 ) {
171+ batch .add (logEntry );
172+ drained ++;
173+ }
174+
175+ if (!batch .isEmpty ()) {
176+ LOGGER .debug ("Flushing {} log entries to central store" , batch .size ());
177+
178+ // Process the batch
179+ for (LogEntry entry : batch ) {
180+ centralLogStore .storeLog (entry );
181+ logCount .decrementAndGet ();
182+ }
183+
184+ LOGGER .debug ("Successfully flushed {} log entries" , batch .size ());
185+ }
186+ } catch (Exception e ) {
187+ LOGGER .error ("Error occurred while flushing buffer" , e );
103188 }
104189 }
105190
106- private void startBufferFlusher () {
107- executorService .execute (
191+ /**
192+ * Starts the periodic buffer flusher using ScheduledExecutorService.
193+ * This eliminates the busy-waiting loop with Thread.sleep().
194+ */
195+ private void startPeriodicFlusher () {
196+ scheduledExecutor .scheduleAtFixedRate (
108197 () -> {
109- while (! Thread . currentThread (). isInterrupted () ) {
198+ if ( running ) {
110199 try {
111- Thread .sleep (5000 ); // Flush every 5 seconds.
112200 flushBuffer ();
113- } catch (InterruptedException e ) {
114- Thread . currentThread (). interrupt ( );
201+ } catch (Exception e ) {
202+ LOGGER . error ( "Error in periodic flush" , e );
115203 }
116204 }
117- });
205+ },
206+ FLUSH_INTERVAL_SECONDS , // Initial delay
207+ FLUSH_INTERVAL_SECONDS , // Period
208+ TimeUnit .SECONDS
209+ );
210+
211+ LOGGER .info ("Periodic log flusher started with interval of {} seconds" , FLUSH_INTERVAL_SECONDS );
212+ }
213+ /**
214+ * Gets the current number of buffered log entries.
215+ * Useful for monitoring and testing.
216+ *
217+ * @return Current buffer size
218+ */
219+ public int getBufferSize () {
220+ return buffer .size ();
221+ }
222+
223+ /**
224+ * Gets the current log count.
225+ * Useful for monitoring and testing.
226+ *
227+ * @return Current log count
228+ */
229+ public int getLogCount () {
230+ return logCount .get ();
231+ }
232+
233+ /**
234+ * Checks if the LogAggregator is currently running.
235+ *
236+ * @return true if running, false if stopped or stopping
237+ */
238+ public boolean isRunning () {
239+ return running ;
118240 }
119241}
0 commit comments