Skip to content

fixing busy waiting in abstraction and eliminate busy-waiting #3302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@
* THE SOFTWARE.
*/
package com.iluwatar.logaggregation;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

/**
* Responsible for collecting and buffering logs from different services. Once the logs reach a
* certain threshold or after a certain time interval, they are flushed to the central log store.
Expand All @@ -41,30 +43,49 @@
public class LogAggregator {

private static final int BUFFER_THRESHOLD = 3;
private static final int FLUSH_INTERVAL_SECONDS = 5;
private static final int SHUTDOWN_TIMEOUT_SECONDS = 10;

private final CentralLogStore centralLogStore;
private final ConcurrentLinkedQueue<LogEntry> buffer = new ConcurrentLinkedQueue<>();
private final BlockingQueue<LogEntry> buffer = new LinkedBlockingQueue<>();
private final LogLevel minLogLevel;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
private final AtomicInteger logCount = new AtomicInteger(0);

private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private volatile boolean running = true;
/**
* constructor of LogAggregator.
*
* @param centralLogStore central log store implement
* @param minLogLevel min log level to store log
*/
public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
this.centralLogStore = centralLogStore;
this.centralLogStore = centralLogStore;
this.minLogLevel = minLogLevel;
startBufferFlusher();
startPeriodicFlusher();

// Add shutdown hook for graceful termination
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
stop();
} catch (InterruptedException e) {
LOGGER.warn("Shutdown interrupted", e);
Thread.currentThread().interrupt();
}
}));
}

/**
* Collects a given log entry, and filters it by the defined log level.
*
* @param logEntry The log entry to collect.
*/
public void collectLog(LogEntry logEntry) {
public void collectLog(LogEntry logEntry) {
if (!running) {
LOGGER.warn("LogAggregator is shutting down. Skipping log entry.");
return;
}

if (logEntry.getLevel() == null || minLogLevel == null) {
LOGGER.warn("Log level or threshold level is null. Skipping.");
return;
Expand All @@ -75,10 +96,17 @@ public void collectLog(LogEntry logEntry) {
return;
}

buffer.offer(logEntry);
// BlockingQueue.offer() is non-blocking and thread-safe
boolean added = buffer.offer(logEntry);
if (!added) {
LOGGER.warn("Failed to add log entry to buffer - queue may be full");
return;
}

// Check if immediate flush is needed due to threshold
if (logCount.incrementAndGet() >= BUFFER_THRESHOLD) {
flushBuffer();
// Schedule immediate flush instead of blocking current thread
scheduledExecutor.execute(this::flushBuffer);
}
}

Expand All @@ -87,33 +115,126 @@ public void collectLog(LogEntry logEntry) {
*
* @throws InterruptedException If any thread has interrupted the current thread.
*/
public void stop() throws InterruptedException {
executorService.shutdownNow();
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.error("Log aggregator did not terminate.");
public void stop() throws InterruptedException {
LOGGER.info("Stopping LogAggregator...");
running = false;

// Shutdown the scheduler gracefully
scheduledExecutor.shutdown();

try {
// Wait for scheduled tasks to complete
if (!scheduledExecutor.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
LOGGER.warn("Scheduler did not terminate gracefully, forcing shutdown");
scheduledExecutor.shutdownNow();

// Wait a bit more for tasks to respond to interruption
if (!scheduledExecutor.awaitTermination(2, TimeUnit.SECONDS)) {
LOGGER.error("Scheduler did not terminate after forced shutdown");
}
}
} finally {
// Final flush of any remaining logs
flushBuffer();
shutdownLatch.countDown();
LOGGER.info("LogAggregator stopped successfully");
}
flushBuffer();
}



/**
* Waits for the LogAggregator to complete shutdown.
* Useful for testing or controlled shutdown scenarios.
*
* @throws InterruptedException If any thread has interrupted the current thread.
*/
public void awaitShutdown() throws InterruptedException {
shutdownLatch.await();
}


private void flushBuffer() {
LogEntry logEntry;
while ((logEntry = buffer.poll()) != null) {
centralLogStore.storeLog(logEntry);
logCount.decrementAndGet();
if (!running && buffer.isEmpty()) {
return;
}

try {
List<LogEntry> batch = new ArrayList<>();
int drained = 0;

// Drain up to a reasonable batch size for efficiency
LogEntry logEntry;
while ((logEntry = buffer.poll()) != null && drained < 100) {
batch.add(logEntry);
drained++;
}

if (!batch.isEmpty()) {
LOGGER.debug("Flushing {} log entries to central store", batch.size());

// Process the batch
for (LogEntry entry : batch) {
centralLogStore.storeLog(entry);
logCount.decrementAndGet();
}

LOGGER.debug("Successfully flushed {} log entries", batch.size());
}
} catch (Exception e) {
LOGGER.error("Error occurred while flushing buffer", e);
}
}

private void startBufferFlusher() {
executorService.execute(

/**
* Starts the periodic buffer flusher using ScheduledExecutorService.
* This eliminates the busy-waiting loop with Thread.sleep().
*/
private void startPeriodicFlusher() {
scheduledExecutor.scheduleAtFixedRate(
() -> {
while (!Thread.currentThread().isInterrupted()) {
if (running) {
try {
Thread.sleep(5000); // Flush every 5 seconds.
flushBuffer();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
LOGGER.error("Error in periodic flush", e);
}
}
});
},
FLUSH_INTERVAL_SECONDS, // Initial delay
FLUSH_INTERVAL_SECONDS, // Period
TimeUnit.SECONDS
);

LOGGER.info("Periodic log flusher started with interval of {} seconds", FLUSH_INTERVAL_SECONDS);
}
/**
* Gets the current number of buffered log entries.
* Useful for monitoring and testing.
*
* @return Current buffer size
*/
public int getBufferSize() {
return buffer.size();
}

/**
* Gets the current log count.
* Useful for monitoring and testing.
*
* @return Current log count
*/
public int getLogCount() {
return logCount.get();
}

/**
* Checks if the LogAggregator is currently running.
*
* @return true if running, false if stopped or stopping
*/
public boolean isRunning() {
return running;
}
}
100 changes: 67 additions & 33 deletions server-session/src/main/java/com/iluwatar/sessionserver/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@
*/
package com.iluwatar.sessionserver;


import java.util.HashMap;
import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.time.Instant;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CountDownLatch;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -57,6 +63,12 @@ public class App {
private static Map<String, Instant> sessionCreationTimes = new HashMap<>();
private static final long SESSION_EXPIRATION_TIME = 10000;

// Scheduler for session expiration task
private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private static volatile boolean running = true;
private static final CountDownLatch shutdownLatch = new CountDownLatch(1);


/**
* Main entry point.
*
Expand All @@ -78,39 +90,61 @@ public static void main(String[] args) throws IOException {
sessionExpirationTask();

LOGGER.info("Server started. Listening on port 8080...");
// Wait for shutdown signal
try {
shutdownLatch.await();
} catch (InterruptedException e) {
LOGGER.error("Main thread interrupted", e);
Thread.currentThread().interrupt();
}
}

private static void sessionExpirationTask() {
new Thread(
() -> {
while (true) {
try {
LOGGER.info("Session expiration checker started...");
Thread.sleep(SESSION_EXPIRATION_TIME); // Sleep for expiration time
Instant currentTime = Instant.now();
synchronized (sessions) {
synchronized (sessionCreationTimes) {
Iterator<Map.Entry<String, Instant>> iterator =
sessionCreationTimes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Instant> entry = iterator.next();
if (entry
.getValue()
.plusMillis(SESSION_EXPIRATION_TIME)
.isBefore(currentTime)) {
sessions.remove(entry.getKey());
iterator.remove();
}
}
}
}
LOGGER.info("Session expiration checker finished!");
} catch (InterruptedException e) {
LOGGER.error("An error occurred: ", e);
Thread.currentThread().interrupt();
}
}
})
.start();
if (!running) {
return;
}
try {
LOGGER.info("Session expiration checker started...");
Instant currentTime = Instant.now();

// Use removeIf for efficient removal without explicit synchronization
// ConcurrentHashMap handles thread safety internally
sessionCreationTimes.entrySet().removeIf(entry -> {
if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) {
sessions.remove(entry.getKey());
LOGGER.debug("Expired session: {}", entry.getKey());
return true;
}
return false;
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In App.java, the sessionExpirationTask uses removeIf on sessionCreationTimes. Ensure that concurrent modification is handled correctly, especially if other parts of the application modify this map concurrently.


LOGGER.info("Session expiration checker finished! Active sessions: {}", sessions.size());
} catch (Exception e) {
LOGGER.error("An error occurred during session expiration check: ", e);
}
}

/**
* Gracefully shuts down the session expiration scheduler.
* This method is called by the shutdown hook.
*/
private static void shutdown() {
LOGGER.info("Shutting down session expiration scheduler...");
running = false;
scheduler.shutdown();

try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
LOGGER.warn("Scheduler did not terminate gracefully, forcing shutdown");
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
LOGGER.warn("Shutdown interrupted, forcing immediate shutdown");
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}

shutdownLatch.countDown();
LOGGER.info("Session expiration scheduler shut down complete");
}
}
Loading
Loading