2424 */
2525package com .iluwatar .logaggregation ;
2626import java .util .concurrent .BlockingQueue ;
27- import java .util .concurrent .ConcurrentLinkedQueue ;
2827import java .util .concurrent .LinkedBlockingQueue ;
2928import java .util .concurrent .ScheduledExecutorService ;
3029import java .util .concurrent .Executors ;
3433import java .util .ArrayList ;
3534import java .util .List ;
3635import lombok .extern .slf4j .Slf4j ;
37-
3836/**
3937 * Responsible for collecting and buffering logs from different services. Once the logs reach a
4038 * certain threshold or after a certain time interval, they are flushed to the central log store.
@@ -47,26 +45,25 @@ public class LogAggregator {
4745 private static final int BUFFER_THRESHOLD = 3 ;
4846 private static final int FLUSH_INTERVAL_SECONDS = 5 ;
4947 private static final int SHUTDOWN_TIMEOUT_SECONDS = 10 ;
50-
48+
5149 private final CentralLogStore centralLogStore ;
52- private final ConcurrentLinkedQueue <LogEntry > buffer = new ConcurrentLinkedQueue <>();
50+ private final BlockingQueue <LogEntry > buffer = new LinkedBlockingQueue <>();
5351 private final LogLevel minLogLevel ;
54- private final ExecutorService executorService = Executors .newSingleThreadExecutor ( );
52+ private final ScheduledExecutorService scheduledExecutor = Executors .newScheduledThreadPool ( 1 );
5553 private final AtomicInteger logCount = new AtomicInteger (0 );
56- private final ScheduledExecutorService scheduledExecutor = Executors .newScheduledThreadPool (1 );
5754 private final CountDownLatch shutdownLatch = new CountDownLatch (1 );
5855 private volatile boolean running = true ;
59-
6056 /**
6157 * constructor of LogAggregator.
6258 *
6359 * @param centralLogStore central log store implement
6460 * @param minLogLevel min log level to store log
6561 */
6662 public LogAggregator (CentralLogStore centralLogStore , LogLevel minLogLevel ) {
67- this .centralLogStore = centralLogStore ;
63+ this .centralLogStore = centralLogStore ;
6864 this .minLogLevel = minLogLevel ;
69- startBufferFlusher ();
65+ startPeriodicFlusher ();
66+
7067 // Add shutdown hook for graceful termination
7168 Runtime .getRuntime ().addShutdownHook (new Thread (() -> {
7269 try {
@@ -83,7 +80,7 @@ public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
8380 *
8481 * @param logEntry The log entry to collect.
8582 */
86- public void collectLog (LogEntry logEntry ) {
83+ public void collectLog (LogEntry logEntry ) {
8784 if (!running ) {
8885 LOGGER .warn ("LogAggregator is shutting down. Skipping log entry." );
8986 return ;
@@ -118,7 +115,7 @@ public void collectLog(LogEntry logEntry) {
118115 *
119116 * @throws InterruptedException If any thread has interrupted the current thread.
120117 */
121- public void stop () throws InterruptedException {
118+ public void stop () throws InterruptedException {
122119 LOGGER .info ("Stopping LogAggregator..." );
123120 running = false ;
124121
@@ -145,19 +142,20 @@ public void stop() throws InterruptedException {
145142 }
146143
147144
145+
148146 /**
149147 * Waits for the LogAggregator to complete shutdown.
150148 * Useful for testing or controlled shutdown scenarios.
151149 *
152150 * @throws InterruptedException If any thread has interrupted the current thread.
153151 */
154- public void awaitShutdown () throws InterruptedException {
152+ public void awaitShutdown () throws InterruptedException {
155153 shutdownLatch .await ();
156154 }
157155
158156
159157 private void flushBuffer () {
160- if (!running && buffer .isEmpty ()) {
158+ if (!running && buffer .isEmpty ()) {
161159 return ;
162160 }
163161
@@ -188,6 +186,7 @@ private void flushBuffer() {
188186 }
189187 }
190188
189+
191190 /**
192191 * Starts the periodic buffer flusher using ScheduledExecutorService.
193192 * This eliminates the busy-waiting loop with Thread.sleep().
0 commit comments