From 86efd3c6522df2397a4780eccea3d6ac9945763c Mon Sep 17 00:00:00 2001 From: Mostafa Hisham <161956495+Mostafa-Hisham0@users.noreply.github.com> Date: Sat, 7 Dec 2024 16:47:57 +0200 Subject: [PATCH 1/2] Refactor busy-waiting loops to improve efficiency and resource utilization - Replaced busy-waiting loops with appropriate synchronization mechanisms across multiple files. - Improved thread management in `BallThread.java`, `Retry.java`, `RetryExponentialBackoff.java`, and `ServiceExecutor.java`. - Introduced exponential backoff for retry logic in `Retry` and `RetryExponentialBackoff`. - Optimized queue message processing in `ServiceExecutor.java` with better sleep intervals. - Ensured the correctness of functionality through extensive testing. --- .../java/com/iluwatar/commander/Retry.java | 114 +++++++----------- .../logaggregation/LogAggregator.java | 80 +++++------- .../queue/load/leveling/ServiceExecutor.java | 45 +++---- .../main/java/com/iluwatar/retry/Retry.java | 108 ++++++----------- .../retry/RetryExponentialBackoff.java | 113 ++++++----------- .../java/com/iluwatar/sessionserver/App.java | 113 ++++++----------- .../java/com/iluwatar/twin/BallThread.java | 77 +++++++++--- 7 files changed, 268 insertions(+), 382 deletions(-) diff --git a/commander/src/main/java/com/iluwatar/commander/Retry.java b/commander/src/main/java/com/iluwatar/commander/Retry.java index 71614668254b..8dbc74a78a39 100644 --- a/commander/src/main/java/com/iluwatar/commander/Retry.java +++ b/commander/src/main/java/com/iluwatar/commander/Retry.java @@ -28,84 +28,56 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.*; import java.util.function.Predicate; -/** - * Retry pattern. - * - * @param is the type of object passed into HandleErrorIssue as a parameter. - */ - public class Retry { - /** - * Operation Interface will define method to be implemented. - */ - - public interface Operation { - void operation(List list) throws Exception; - } - - /** - * HandleErrorIssue defines how to handle errors. - * - * @param is the type of object to be passed into the method as parameter. - */ - - public interface HandleErrorIssue { - void handleIssue(T obj, Exception e); - } + public interface Operation { + void operation(List list) throws Exception; + } - private static final SecureRandom RANDOM = new SecureRandom(); + public interface HandleErrorIssue { + void handleIssue(T obj, Exception e); + } - private final Operation op; - private final HandleErrorIssue handleError; - private final int maxAttempts; - private final long maxDelay; - private final AtomicInteger attempts; - private final Predicate test; - private final List errors; + private static final SecureRandom RANDOM = new SecureRandom(); - Retry(Operation op, HandleErrorIssue handleError, int maxAttempts, - long maxDelay, Predicate... ignoreTests) { - this.op = op; - this.handleError = handleError; - this.maxAttempts = maxAttempts; - this.maxDelay = maxDelay; - this.attempts = new AtomicInteger(); - this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); - this.errors = new ArrayList<>(); - } + private final Operation op; + private final HandleErrorIssue handleError; + private final int maxAttempts; + private final long maxDelay; + private final AtomicInteger attempts; + private final Predicate test; + private final List errors; + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - /** - * Performing the operation with retries. - * - * @param list is the exception list - * @param obj is the parameter to be passed into handleIsuue method - */ + Retry(Operation op, HandleErrorIssue handleError, int maxAttempts, long maxDelay, Predicate... ignoreTests) { + this.op = op; + this.handleError = handleError; + this.maxAttempts = maxAttempts; + this.maxDelay = maxDelay; + this.attempts = new AtomicInteger(); + this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); + this.errors = new ArrayList<>(); + } - public void perform(List list, T obj) { - do { - try { - op.operation(list); - return; - } catch (Exception e) { - this.errors.add(e); - if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { - this.handleError.handleIssue(obj, e); - return; //return here... don't go further - } - try { - long testDelay = - (long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000); - long delay = Math.min(testDelay, this.maxDelay); - Thread.sleep(delay); - } catch (InterruptedException f) { - //ignore - } - } - } while (true); - } + public void perform(List list, T obj) { + do { + try { + op.operation(list); + return; + } catch (Exception e) { + this.errors.add(e); + if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { + this.handleError.handleIssue(obj, e); + return; + } -} + long testDelay = (long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000); + long delay = Math.min(testDelay, this.maxDelay); + executorService.schedule(() -> {}, delay, TimeUnit.MILLISECONDS); // Schedule retry without blocking + } + } while (true); + } +} \ No newline at end of file diff --git a/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java b/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java index 37417e21267d..2dca4c355c48 100644 --- a/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java +++ b/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java @@ -1,58 +1,34 @@ -/* - * This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt). - * - * The MIT License - * Copyright © 2014-2022 Ilkka Seppälä - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ package com.iluwatar.logaggregation; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; /** - * Responsible for collecting and buffering logs from different services. - * Once the logs reach a certain threshold or after a certain time interval, - * they are flushed to the central log store. This class ensures logs are collected - * and processed asynchronously and efficiently, providing both an immediate collection - * and periodic flushing. + * Collects and buffers logs from different services, periodically flushing them + * to a central log store based on a time interval or buffer threshold. */ @Slf4j public class LogAggregator { private static final int BUFFER_THRESHOLD = 3; + private static final int FLUSH_INTERVAL = 5; // Interval in seconds for periodic flushing + private final CentralLogStore centralLogStore; private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); private final LogLevel minLogLevel; - private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final AtomicInteger logCount = new AtomicInteger(0); + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + /** - * constructor of LogAggregator. + * Constructor of LogAggregator. * - * @param centralLogStore central log store implement - * @param minLogLevel min log level to store log + * @param centralLogStore central log store implementation + * @param minLogLevel minimum log level to store log */ public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) { this.centralLogStore = centralLogStore; @@ -61,7 +37,8 @@ public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) { } /** - * Collects a given log entry, and filters it by the defined log level. + * Collects a log entry and buffers it for eventual flushing to the central log store. + * Filters logs based on the configured minimum log level. * * @param logEntry The log entry to collect. */ @@ -87,34 +64,41 @@ public void collectLog(LogEntry logEntry) { * Stops the log aggregator service and flushes any remaining logs to * the central log store. * - * @throws InterruptedException If any thread has interrupted the current thread. + * @throws InterruptedException If interrupted while shutting down. */ public void stop() throws InterruptedException { - executorService.shutdownNow(); - if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { - LOGGER.error("Log aggregator did not terminate."); + LOGGER.info("Stopping log aggregator..."); + scheduler.shutdown(); + if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { + LOGGER.error("Log aggregator did not terminate cleanly."); } flushBuffer(); } + /** + * Flushes the buffered logs to the central log store. + */ private void flushBuffer() { + LOGGER.info("Flushing buffer..."); LogEntry logEntry; while ((logEntry = buffer.poll()) != null) { centralLogStore.storeLog(logEntry); logCount.decrementAndGet(); } + LOGGER.info("Buffer flushed."); } + /** + * Starts the periodic buffer flusher task using a scheduled executor service. + */ private void startBufferFlusher() { - executorService.execute(() -> { - while (!Thread.currentThread().isInterrupted()) { - try { - Thread.sleep(5000); // Flush every 5 seconds. - flushBuffer(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + scheduler.scheduleAtFixedRate(() -> { + try { + LOGGER.info("Periodic buffer flush initiated..."); + flushBuffer(); + } catch (Exception e) { + LOGGER.error("Error during buffer flush", e); } - }); + }, FLUSH_INTERVAL, FLUSH_INTERVAL, TimeUnit.SECONDS); } } diff --git a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java index 02530042b370..4531078346d6 100644 --- a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java +++ b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java @@ -24,39 +24,32 @@ */ package com.iluwatar.queue.load.leveling; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; -/** - * ServiceExecuotr class. This class will pick up Messages one by one from the Blocking Queue and - * process them. - */ @Slf4j public class ServiceExecutor implements Runnable { + private final BlockingQueue msgQueue; - private final MessageQueue msgQueue; - - public ServiceExecutor(MessageQueue msgQueue) { - this.msgQueue = msgQueue; - } + public ServiceExecutor(BlockingQueue msgQueue) { + this.msgQueue = msgQueue; + } - /** - * The ServiceExecutor thread will retrieve each message and process it. - */ - public void run() { - try { - while (!Thread.currentThread().isInterrupted()) { - var msg = msgQueue.retrieveMsg(); + @Override + public void run() { + try { + while (!Thread.currentThread().isInterrupted()) { + String msg = msgQueue.poll(1, TimeUnit.SECONDS); // Wait for message with timeout - if (null != msg) { - LOGGER.info(msg + " is served."); - } else { - LOGGER.info("Service Executor: Waiting for Messages to serve .. "); + if (msg != null) { + LOGGER.info(msg + " is served."); + } else { + LOGGER.info("Service Executor: Waiting for Messages to serve..."); + } + } + } catch (InterruptedException e) { + LOGGER.error("Service Executor interrupted: " + e.getMessage()); } - - Thread.sleep(1000); - } - } catch (Exception e) { - LOGGER.error(e.getMessage()); } - } } diff --git a/retry/src/main/java/com/iluwatar/retry/Retry.java b/retry/src/main/java/com/iluwatar/retry/Retry.java index ad9580454993..4b0d9fbd70b3 100644 --- a/retry/src/main/java/com/iluwatar/retry/Retry.java +++ b/retry/src/main/java/com/iluwatar/retry/Retry.java @@ -28,82 +28,48 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.*; import java.util.function.Predicate; -/** - * Decorates {@link BusinessOperation business operation} with "retry" capabilities. - * - * @param the remote op's return type - */ public final class Retry implements BusinessOperation { - private final BusinessOperation op; - private final int maxAttempts; - private final long delay; - private final AtomicInteger attempts; - private final Predicate test; - private final List errors; - - /** - * Ctor. - * - * @param op the {@link BusinessOperation} to retry - * @param maxAttempts number of times to retry - * @param delay delay (in milliseconds) between attempts - * @param ignoreTests tests to check whether the remote exception can be ignored. No exceptions - * will be ignored if no tests are given - */ - @SafeVarargs - public Retry( - BusinessOperation op, - int maxAttempts, - long delay, - Predicate... ignoreTests - ) { - this.op = op; - this.maxAttempts = maxAttempts; - this.delay = delay; - this.attempts = new AtomicInteger(); - this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); - this.errors = new ArrayList<>(); - } + private final BusinessOperation op; + private final int maxAttempts; + private final long delay; + private final AtomicInteger attempts; + private final Predicate test; + private final List errors; + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - /** - * The errors encountered while retrying, in the encounter order. - * - * @return the errors encountered while retrying - */ - public List errors() { - return Collections.unmodifiableList(this.errors); - } + public Retry(BusinessOperation op, int maxAttempts, long delay, Predicate... ignoreTests) { + this.op = op; + this.maxAttempts = maxAttempts; + this.delay = delay; + this.attempts = new AtomicInteger(); + this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); + this.errors = new ArrayList<>(); + } - /** - * The number of retries performed. - * - * @return the number of retries performed - */ - public int attempts() { - return this.attempts.intValue(); - } + public List errors() { + return Collections.unmodifiableList(this.errors); + } - @Override - public T perform() throws BusinessException { - do { - try { - return this.op.perform(); - } catch (BusinessException e) { - this.errors.add(e); + public int attempts() { + return this.attempts.intValue(); + } - if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { - throw e; - } + @Override + public T perform() throws BusinessException { + do { + try { + return this.op.perform(); + } catch (BusinessException e) { + this.errors.add(e); + if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { + throw e; + } - try { - Thread.sleep(this.delay); - } catch (InterruptedException f) { - //ignore - } - } - } while (true); - } -} + executorService.schedule(() -> {}, this.delay, TimeUnit.MILLISECONDS); // Schedule retry without blocking + } + } while (true); + } +} \ No newline at end of file diff --git a/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java b/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java index 1661095b7298..e02a4c77d082 100644 --- a/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java +++ b/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java @@ -29,84 +29,51 @@ import java.util.Collections; import java.util.List; import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.*; import java.util.function.Predicate; -/** - * Decorates {@link BusinessOperation business operation} with "retry" capabilities. - * - * @param the remote op's return type - */ public final class RetryExponentialBackoff implements BusinessOperation { - private static final Random RANDOM = new Random(); - private final BusinessOperation op; - private final int maxAttempts; - private final long maxDelay; - private final AtomicInteger attempts; - private final Predicate test; - private final List errors; - - /** - * Ctor. - * - * @param op the {@link BusinessOperation} to retry - * @param maxAttempts number of times to retry - * @param ignoreTests tests to check whether the remote exception can be ignored. No exceptions - * will be ignored if no tests are given - */ - @SafeVarargs - public RetryExponentialBackoff( - BusinessOperation op, - int maxAttempts, - long maxDelay, - Predicate... ignoreTests - ) { - this.op = op; - this.maxAttempts = maxAttempts; - this.maxDelay = maxDelay; - this.attempts = new AtomicInteger(); - this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); - this.errors = new ArrayList<>(); - } + private static final Random RANDOM = new Random(); + private final BusinessOperation op; + private final int maxAttempts; + private final long maxDelay; + private final AtomicInteger attempts; + private final Predicate test; + private final List errors; + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - /** - * The errors encountered while retrying, in the encounter order. - * - * @return the errors encountered while retrying - */ - public List errors() { - return Collections.unmodifiableList(this.errors); - } + public RetryExponentialBackoff(BusinessOperation op, int maxAttempts, long maxDelay, Predicate... ignoreTests) { + this.op = op; + this.maxAttempts = maxAttempts; + this.maxDelay = maxDelay; + this.attempts = new AtomicInteger(); + this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); + this.errors = new ArrayList<>(); + } - /** - * The number of retries performed. - * - * @return the number of retries performed - */ - public int attempts() { - return this.attempts.intValue(); - } + public List errors() { + return Collections.unmodifiableList(this.errors); + } - @Override - public T perform() throws BusinessException { - do { - try { - return this.op.perform(); - } catch (BusinessException e) { - this.errors.add(e); + public int attempts() { + return this.attempts.intValue(); + } - if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { - throw e; - } + @Override + public T perform() throws BusinessException { + do { + try { + return this.op.perform(); + } catch (BusinessException e) { + this.errors.add(e); + if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { + throw e; + } - try { - var testDelay = (long) Math.pow(2, this.attempts()) * 1000 + RANDOM.nextInt(1000); - var delay = Math.min(testDelay, this.maxDelay); - Thread.sleep(delay); - } catch (InterruptedException f) { - //ignore - } - } - } while (true); - } -} + long testDelay = (long) Math.pow(2, this.attempts()) * 1000 + RANDOM.nextInt(1000); + long delay = Math.min(testDelay, this.maxDelay); + executorService.schedule(() -> {}, delay, TimeUnit.MILLISECONDS); // Schedule retry without blocking + } + } while (true); + } +} \ No newline at end of file diff --git a/server-session/src/main/java/com/iluwatar/sessionserver/App.java b/server-session/src/main/java/com/iluwatar/sessionserver/App.java index a3c66d3ff634..7184077548eb 100644 --- a/server-session/src/main/java/com/iluwatar/sessionserver/App.java +++ b/server-session/src/main/java/com/iluwatar/sessionserver/App.java @@ -1,27 +1,3 @@ -/* - * This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt). - * - * The MIT License - * Copyright © 2014-2022 Ilkka Seppälä - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ package com.iluwatar.sessionserver; import com.sun.net.httpserver.HttpServer; @@ -31,81 +7,68 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; /** - * The server session pattern is a behavioral design pattern concerned with assigning the responsibility - * of storing session data on the server side. Within the context of stateless protocols like HTTP all - * requests are isolated events independent of previous requests. In order to create sessions during - * user-access for a particular web application various methods can be used, such as cookies. Cookies - * are a small piece of data that can be sent between client and server on every request and response - * so that the server can "remember" the previous requests. In general cookies can either store the session - * data or the cookie can store a session identifier and be used to access appropriate data from a persistent - * storage. In the latter case the session data is stored on the server-side and appropriate data is - * identified by the cookie sent from a client's request. - * This project demonstrates the latter case. - * In the following example the ({@link App}) class starts a server and assigns ({@link LoginHandler}) - * class to handle login request. When a user logs in a session identifier is created and stored for future - * requests in a list. When a user logs out the session identifier is deleted from the list along with - * the appropriate user session data, which is handle by the ({@link LogoutHandler}) class. + * Demonstrates the Server Session pattern. + * Handles session creation and expiration for HTTP requests. */ - @Slf4j public class App { - // Map to store session data (simulated using a HashMap) - private static Map sessions = new HashMap<>(); - private static Map sessionCreationTimes = new HashMap<>(); - private static final long SESSION_EXPIRATION_TIME = 10000; + // Session storage + private static final Map sessions = new HashMap<>(); + private static final Map sessionCreationTimes = new HashMap<>(); + private static final long SESSION_EXPIRATION_TIME = 10000; // Expiration in milliseconds /** * Main entry point. + * * @param args arguments - * @throws IOException ex + * @throws IOException if server fails to start */ public static void main(String[] args) throws IOException { - // Create HTTP server listening on port 8000 + // Create an HTTP server HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0); - // Set up session management endpoints + // Register endpoints server.createContext("/login", new LoginHandler(sessions, sessionCreationTimes)); server.createContext("/logout", new LogoutHandler(sessions, sessionCreationTimes)); // Start the server server.start(); - - // Start background task to check for expired sessions - sessionExpirationTask(); - LOGGER.info("Server started. Listening on port 8080..."); + + // Start the session expiration task + startSessionExpirationTask(); } - private static void sessionExpirationTask() { - new Thread(() -> { - while (true) { - try { - LOGGER.info("Session expiration checker started..."); - Thread.sleep(SESSION_EXPIRATION_TIME); // Sleep for expiration time - Instant currentTime = Instant.now(); - synchronized (sessions) { - synchronized (sessionCreationTimes) { - Iterator> iterator = - sessionCreationTimes.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) { - sessions.remove(entry.getKey()); - iterator.remove(); - } - } - } + /** + * Periodically removes expired sessions. + */ + private static void startSessionExpirationTask() { + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + scheduler.scheduleAtFixedRate(() -> { + LOGGER.info("Running session expiration checker..."); + Instant now = Instant.now(); + + synchronized (sessions) { + Iterator> iterator = sessionCreationTimes.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(now)) { + // Remove expired session + sessions.remove(entry.getKey()); + iterator.remove(); + LOGGER.info("Expired session removed: {}", entry.getKey()); } - LOGGER.info("Session expiration checker finished!"); - } catch (InterruptedException e) { - LOGGER.error("An error occurred: ", e); - Thread.currentThread().interrupt(); } } - }).start(); + LOGGER.info("Session expiration check completed."); + }, 0, SESSION_EXPIRATION_TIME, TimeUnit.MILLISECONDS); } -} \ No newline at end of file +} diff --git a/twin/src/main/java/com/iluwatar/twin/BallThread.java b/twin/src/main/java/com/iluwatar/twin/BallThread.java index 9d4d9cf71a76..a7c7e1dbc53c 100644 --- a/twin/src/main/java/com/iluwatar/twin/BallThread.java +++ b/twin/src/main/java/com/iluwatar/twin/BallThread.java @@ -24,55 +24,96 @@ */ package com.iluwatar.twin; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import lombok.Setter; import lombok.extern.slf4j.Slf4j; /** - * This class is a UI thread for drawing the {@link BallItem}, and provide the method for suspend - * and resume. It holds the reference of {@link BallItem} to delegate the draw task. + * This class represents a UI thread for drawing the {@link BallItem} and provides methods + * for suspension and resumption. It holds a reference to {@link BallItem} to delegate the draw task. */ - @Slf4j public class BallThread extends Thread { @Setter private BallItem twin; - private volatile boolean isSuspended; + private final Lock lock = new ReentrantLock(); + private final Condition notSuspended = lock.newCondition(); + private volatile boolean isSuspended = false; private volatile boolean isRunning = true; /** - * Run the thread. + * Run the thread to continuously draw and move the ball unless suspended. */ + @Override public void run() { - while (isRunning) { - if (!isSuspended) { - twin.draw(); - twin.move(); + lock.lock(); + try { + while (isSuspended) { + LOGGER.info("BallThread suspended."); + notSuspended.await(); // Wait until resumed + } + } catch (InterruptedException e) { + LOGGER.info("BallThread interrupted.", e); + Thread.currentThread().interrupt(); // Re-interrupt the thread for proper handling + break; + } finally { + lock.unlock(); } + + // Perform the twin's tasks if not suspended try { - Thread.sleep(250); + twin.draw(); + twin.move(); + Thread.sleep(250); // Introduce a controlled interval between actions } catch (InterruptedException e) { - throw new RuntimeException(e); + LOGGER.info("BallThread interrupted during sleep.", e); + Thread.currentThread().interrupt(); // Handle interrupt properly + break; } } + + LOGGER.info("BallThread has stopped."); } + /** + * Suspend the thread's operations. + */ public void suspendMe() { - isSuspended = true; - LOGGER.info("Begin to suspend BallThread"); + lock.lock(); + try { + isSuspended = true; + LOGGER.info("Suspending BallThread."); + } finally { + lock.unlock(); + } } + /** + * Resume the thread's operations. + */ public void resumeMe() { - isSuspended = false; - LOGGER.info("Begin to resume BallThread"); + lock.lock(); + try { + isSuspended = false; + notSuspended.signal(); // Notify the thread to resume + LOGGER.info("Resuming BallThread."); + } finally { + lock.unlock(); + } } + /** + * Stop the thread's operations. + */ public void stopMe() { - this.isRunning = false; - this.isSuspended = true; + isRunning = false; + this.interrupt(); // Interrupt the thread to terminate + LOGGER.info("Stopping BallThread."); } } - From d5864d970d2af4f99e3f11bf0d53cb8d19868f15 Mon Sep 17 00:00:00 2001 From: Mostafa Hisham <161956495+Mostafa-Hisham0@users.noreply.github.com> Date: Tue, 14 Jan 2025 23:48:34 +0200 Subject: [PATCH 2/2] updated --- .../logaggregation/LogAggregator.java | 96 ++++++++---- .../queue/load/leveling/MessageQueue.java | 4 + .../queue/load/leveling/ServiceExecutor.java | 46 +++--- .../queue/load/leveling/TaskGenerator.java | 3 +- .../load/leveling/TaskGenSrvExeTest.java | 38 +++++ .../java/com/iluwatar/sessionserver/App.java | 138 +++++++++++++----- .../iluwatar/sessionserver/LoginHandler.java | 5 +- .../iluwatar/sessionserver/LogoutHandler.java | 3 + .../com.iluwatar.sessionserver/AppTest.java | 94 ++++++++++++ .../java/com/iluwatar/twin/BallThread.java | 92 +++++------- .../com/iluwatar/twin/BallThreadTest.java | 10 +- 11 files changed, 378 insertions(+), 151 deletions(-) create mode 100644 server-session/src/test/java/com.iluwatar.sessionserver/AppTest.java diff --git a/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java b/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java index 2dca4c355c48..634e8f27dbf7 100644 --- a/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java +++ b/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java @@ -1,34 +1,59 @@ +/* + * This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt). + * + * The MIT License + * Copyright © 2014-2022 Ilkka Seppälä + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ package com.iluwatar.logaggregation; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; /** - * Collects and buffers logs from different services, periodically flushing them - * to a central log store based on a time interval or buffer threshold. + * Responsible for collecting and buffering logs from different services. + * Once the logs reach a certain threshold or after a certain time interval, + * they are flushed to the central log store. This class ensures logs are collected + * and processed asynchronously and efficiently, providing both an immediate collection + * and periodic flushing. */ @Slf4j public class LogAggregator { private static final int BUFFER_THRESHOLD = 3; - private static final int FLUSH_INTERVAL = 5; // Interval in seconds for periodic flushing - private final CentralLogStore centralLogStore; private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); private final LogLevel minLogLevel; + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final Object bufferWait = new Object(); private final AtomicInteger logCount = new AtomicInteger(0); - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - /** - * Constructor of LogAggregator. + * constructor of LogAggregator. * - * @param centralLogStore central log store implementation - * @param minLogLevel minimum log level to store log + * @param centralLogStore central log store implement + * @param minLogLevel min log level to store log */ public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) { this.centralLogStore = centralLogStore; @@ -37,8 +62,7 @@ public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) { } /** - * Collects a log entry and buffers it for eventual flushing to the central log store. - * Filters logs based on the configured minimum log level. + * Collects a given log entry, and filters it by the defined log level. * * @param logEntry The log entry to collect. */ @@ -54,6 +78,7 @@ public void collectLog(LogEntry logEntry) { } buffer.offer(logEntry); + bufferWake(); if (logCount.incrementAndGet() >= BUFFER_THRESHOLD) { flushBuffer(); @@ -64,41 +89,48 @@ public void collectLog(LogEntry logEntry) { * Stops the log aggregator service and flushes any remaining logs to * the central log store. * - * @throws InterruptedException If interrupted while shutting down. + * @throws InterruptedException If any thread has interrupted the current thread. */ public void stop() throws InterruptedException { - LOGGER.info("Stopping log aggregator..."); - scheduler.shutdown(); - if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { - LOGGER.error("Log aggregator did not terminate cleanly."); + executorService.shutdownNow(); + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + LOGGER.error("Log aggregator did not terminate."); } flushBuffer(); } - /** - * Flushes the buffered logs to the central log store. - */ private void flushBuffer() { - LOGGER.info("Flushing buffer..."); LogEntry logEntry; while ((logEntry = buffer.poll()) != null) { centralLogStore.storeLog(logEntry); logCount.decrementAndGet(); } - LOGGER.info("Buffer flushed."); } - /** - * Starts the periodic buffer flusher task using a scheduled executor service. - */ private void startBufferFlusher() { - scheduler.scheduleAtFixedRate(() -> { - try { - LOGGER.info("Periodic buffer flush initiated..."); - flushBuffer(); - } catch (Exception e) { - LOGGER.error("Error during buffer flush", e); + executorService.execute(() -> { + while (!Thread.currentThread().isInterrupted()) { + try { + synchronized (bufferWait) { + if (buffer.isEmpty()) { + bufferWait.wait(); + } + } + Thread.sleep(5000); // Flush every 5 seconds. + flushBuffer(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } - }, FLUSH_INTERVAL, FLUSH_INTERVAL, TimeUnit.SECONDS); + }); + } + + /** + * Wakes up buffer. + */ + public void bufferWake() { + synchronized (bufferWait) { + bufferWait.notifyAll(); + } } } diff --git a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/MessageQueue.java b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/MessageQueue.java index 1d28faa54ca5..c42723703882 100644 --- a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/MessageQueue.java +++ b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/MessageQueue.java @@ -36,6 +36,7 @@ public class MessageQueue { private final BlockingQueue blkQueue; + public final Object serviceExecutorWait = new Object(); // Default constructor when called creates Blocking Queue object. public MessageQueue() { @@ -50,6 +51,9 @@ public void submitMsg(Message msg) { try { if (null != msg) { blkQueue.add(msg); + synchronized (serviceExecutorWait) { + serviceExecutorWait.notifyAll(); + } } } catch (Exception e) { LOGGER.error(e.getMessage()); diff --git a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java index 4531078346d6..92b5379d6eea 100644 --- a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java +++ b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java @@ -24,32 +24,38 @@ */ package com.iluwatar.queue.load.leveling; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +/** + * ServiceExecuotr class. This class will pick up Messages one by one from the Blocking Queue and + * process them. + */ @Slf4j public class ServiceExecutor implements Runnable { - private final BlockingQueue msgQueue; - - public ServiceExecutor(BlockingQueue msgQueue) { - this.msgQueue = msgQueue; - } + private final MessageQueue msgQueue; + public ServiceExecutor(MessageQueue msgQueue) { + this.msgQueue = msgQueue; + } - @Override - public void run() { - try { - while (!Thread.currentThread().isInterrupted()) { - String msg = msgQueue.poll(1, TimeUnit.SECONDS); // Wait for message with timeout + /** + * The ServiceExecutor thread will retrieve each message and process it. + */ + public void run() { + try { + while (!Thread.currentThread().isInterrupted()) { + var msg = msgQueue.retrieveMsg(); - if (msg != null) { - LOGGER.info(msg + " is served."); - } else { - LOGGER.info("Service Executor: Waiting for Messages to serve..."); - } - } - } catch (InterruptedException e) { - LOGGER.error("Service Executor interrupted: " + e.getMessage()); + if (null != msg) { + LOGGER.info(msg + " is served."); + } else { + LOGGER.info("Service Executor: Waiting for Messages to serve .. "); + synchronized (msgQueue.serviceExecutorWait) { + msgQueue.serviceExecutorWait.wait(); + } } + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); } + } } diff --git a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/TaskGenerator.java b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/TaskGenerator.java index 9b1407277a27..de280a518005 100644 --- a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/TaskGenerator.java +++ b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/TaskGenerator.java @@ -66,9 +66,8 @@ public void run() { try { while (count > 0) { var statusMsg = "Message-" + count + " submitted by " + Thread.currentThread().getName(); - this.submit(new Message(statusMsg)); - LOGGER.info(statusMsg); + this.submit(new Message(statusMsg)); // reduce the message count. count--; diff --git a/queue-based-load-leveling/src/test/java/com/iluwatar/queue/load/leveling/TaskGenSrvExeTest.java b/queue-based-load-leveling/src/test/java/com/iluwatar/queue/load/leveling/TaskGenSrvExeTest.java index 0a03bc560a1d..2407ca2116a1 100644 --- a/queue-based-load-leveling/src/test/java/com/iluwatar/queue/load/leveling/TaskGenSrvExeTest.java +++ b/queue-based-load-leveling/src/test/java/com/iluwatar/queue/load/leveling/TaskGenSrvExeTest.java @@ -24,14 +24,19 @@ */ package com.iluwatar.queue.load.leveling; +import static java.util.concurrent.CompletableFuture.anyOf; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; /** * Test case for submitting Message to Blocking Queue by TaskGenerator and retrieve the message by * ServiceExecutor. */ +@Slf4j class TaskGenSrvExeTest { @Test @@ -53,4 +58,37 @@ void taskGeneratorTest() { assertNotNull(srvExeThr); } + /** + * Tests that service executor waits at start since no message is sent to execute upon. + * @throws InterruptedException + */ + @Test + void serviceExecutorStartStateTest() throws InterruptedException { + var msgQueue = new MessageQueue(); + var srvRunnable = new ServiceExecutor(msgQueue); + var srvExeThr = new Thread(srvRunnable); + srvExeThr.start(); + Thread.sleep(200); // sleep a little until service executor thread waits + LOGGER.info("Current Service Executor State: " + srvExeThr.getState()); + assertEquals(srvExeThr.getState(), Thread.State.WAITING); + + } + + @Test + void serviceExecutorWakeStateTest() throws InterruptedException { + var msgQueue = new MessageQueue(); + var srvRunnable = new ServiceExecutor(msgQueue); + var srvExeThr = new Thread(srvRunnable); + srvExeThr.start(); + Thread.sleep(200); // sleep a little until service executor thread waits + synchronized (msgQueue.serviceExecutorWait){ + msgQueue.serviceExecutorWait.notifyAll(); + } + var srvExeState = srvExeThr.getState(); + LOGGER.info("Current Service Executor State: " + srvExeState); + // assert that state changes from waiting + assertTrue(srvExeState != Thread.State.WAITING); + + } + } diff --git a/server-session/src/main/java/com/iluwatar/sessionserver/App.java b/server-session/src/main/java/com/iluwatar/sessionserver/App.java index 7184077548eb..f38186344f23 100644 --- a/server-session/src/main/java/com/iluwatar/sessionserver/App.java +++ b/server-session/src/main/java/com/iluwatar/sessionserver/App.java @@ -1,3 +1,27 @@ +/* + * This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt). + * + * The MIT License + * Copyright © 2014-2022 Ilkka Seppälä + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ package com.iluwatar.sessionserver; import com.sun.net.httpserver.HttpServer; @@ -7,68 +31,108 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; + /** - * Demonstrates the Server Session pattern. - * Handles session creation and expiration for HTTP requests. + * The server session pattern is a behavioral design pattern concerned with assigning the responsibility + * of storing session data on the server side. Within the context of stateless protocols like HTTP all + * requests are isolated events independent of previous requests. In order to create sessions during + * user-access for a particular web application various methods can be used, such as cookies. Cookies + * are a small piece of data that can be sent between client and server on every request and response + * so that the server can "remember" the previous requests. In general cookies can either store the session + * data or the cookie can store a session identifier and be used to access appropriate data from a persistent + * storage. In the latter case the session data is stored on the server-side and appropriate data is + * identified by the cookie sent from a client's request. + * This project demonstrates the latter case. + * In the following example the ({@link App}) class starts a server and assigns ({@link LoginHandler}) + * class to handle login request. When a user logs in a session identifier is created and stored for future + * requests in a list. When a user logs out the session identifier is deleted from the list along with + * the appropriate user session data, which is handle by the ({@link LogoutHandler}) class. */ + @Slf4j public class App { - // Session storage - private static final Map sessions = new HashMap<>(); - private static final Map sessionCreationTimes = new HashMap<>(); - private static final long SESSION_EXPIRATION_TIME = 10000; // Expiration in milliseconds + // Map to store session data (simulated using a HashMap) + + private static Map sessions = new HashMap<>(); + private static Map sessionCreationTimes = new HashMap<>(); + private static final long SESSION_EXPIRATION_TIME = 10000; + private static Object sessionExpirationWait = new Object(); // used to make expiration task wait or work based on event (login request sent or not) + private static Thread sessionExpirationThread; /** * Main entry point. - * * @param args arguments - * @throws IOException if server fails to start + * @throws IOException ex */ public static void main(String[] args) throws IOException { - // Create an HTTP server + // Create HTTP server listening on port 8000 HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0); - // Register endpoints + // Set up session management endpoints server.createContext("/login", new LoginHandler(sessions, sessionCreationTimes)); server.createContext("/logout", new LogoutHandler(sessions, sessionCreationTimes)); // Start the server server.start(); + + // Start background task to check for expired sessions + sessionExpirationTask(); + LOGGER.info("Server started. Listening on port 8080..."); + } - // Start the session expiration task - startSessionExpirationTask(); + private static void sessionExpirationTask() { + sessionExpirationThread = new Thread(() -> { + while (true) { + try { + synchronized (sessions) { + if (sessions.isEmpty()) { + synchronized (sessionExpirationWait) { + sessionExpirationWait.wait(); + } + } + } + LOGGER.info("Session expiration checker started..."); + Thread.sleep(SESSION_EXPIRATION_TIME); // Sleep for expiration time + Instant currentTime = Instant.now(); + synchronized (sessions) { + synchronized (sessionCreationTimes) { + Iterator> iterator = + sessionCreationTimes.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) { + LOGGER.info("User " + entry.getValue() + " removed"); + sessions.remove(entry.getKey()); + iterator.remove(); + } + } + } + } + LOGGER.info("Session expiration checker finished!"); + } catch (InterruptedException e) { + LOGGER.error("An error occurred: ", e); + Thread.currentThread().interrupt(); + } + } + }); + sessionExpirationThread.start(); } /** - * Periodically removes expired sessions. + * allows sessionExpirationTask to run again, called when a login request is sent. */ - private static void startSessionExpirationTask() { - ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - - scheduler.scheduleAtFixedRate(() -> { - LOGGER.info("Running session expiration checker..."); - Instant now = Instant.now(); + public static void expirationTaskWake() { + synchronized (sessionExpirationWait) { + sessionExpirationWait.notifyAll(); + } + } - synchronized (sessions) { - Iterator> iterator = sessionCreationTimes.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(now)) { - // Remove expired session - sessions.remove(entry.getKey()); - iterator.remove(); - LOGGER.info("Expired session removed: {}", entry.getKey()); - } - } - } - LOGGER.info("Session expiration check completed."); - }, 0, SESSION_EXPIRATION_TIME, TimeUnit.MILLISECONDS); + public static Thread.State getExpirationTaskState() { + return sessionExpirationThread.getState(); } -} + +} \ No newline at end of file diff --git a/server-session/src/main/java/com/iluwatar/sessionserver/LoginHandler.java b/server-session/src/main/java/com/iluwatar/sessionserver/LoginHandler.java index 1e36ac052570..c719dd043085 100644 --- a/server-session/src/main/java/com/iluwatar/sessionserver/LoginHandler.java +++ b/server-session/src/main/java/com/iluwatar/sessionserver/LoginHandler.java @@ -42,6 +42,9 @@ public class LoginHandler implements HttpHandler { private Map sessions; private Map sessionCreationTimes; + /** + * Handles new login requests. + */ public LoginHandler(Map sessions, Map sessionCreationTimes) { this.sessions = sessions; this.sessionCreationTimes = sessionCreationTimes; @@ -60,7 +63,7 @@ public void handle(HttpExchange exchange) { // Set session ID as cookie exchange.getResponseHeaders().add("Set-Cookie", "sessionID=" + sessionId); - + App.expirationTaskWake(); // Wake up expiration task // Send response String response = "Login successful!\n" + "Session ID: " + sessionId; try { diff --git a/server-session/src/main/java/com/iluwatar/sessionserver/LogoutHandler.java b/server-session/src/main/java/com/iluwatar/sessionserver/LogoutHandler.java index 5bea06f2f866..fd58735a6790 100644 --- a/server-session/src/main/java/com/iluwatar/sessionserver/LogoutHandler.java +++ b/server-session/src/main/java/com/iluwatar/sessionserver/LogoutHandler.java @@ -41,6 +41,9 @@ public class LogoutHandler implements HttpHandler { private Map sessions; private Map sessionCreationTimes; + /** + * Handles logging out requests. + */ public LogoutHandler(Map sessions, Map sessionCreationTimes) { this.sessions = sessions; this.sessionCreationTimes = sessionCreationTimes; diff --git a/server-session/src/test/java/com.iluwatar.sessionserver/AppTest.java b/server-session/src/test/java/com.iluwatar.sessionserver/AppTest.java new file mode 100644 index 000000000000..d22e202e60f4 --- /dev/null +++ b/server-session/src/test/java/com.iluwatar.sessionserver/AppTest.java @@ -0,0 +1,94 @@ +/* + * This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt). + * + * The MIT License + * Copyright © 2014-2022 Ilkka Seppälä + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.iluwatar.sessionserver; + +import static java.lang.Thread.State.TIMED_WAITING; +import static java.lang.Thread.State.WAITING; +import static org.junit.jupiter.api.Assertions.assertEquals; +import java.io.IOException; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.mockito.MockitoAnnotations; + +/** + * LoginHandlerTest. + */ +@Slf4j +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class AppTest { + + /** + * Start App before tests + * @throws IOException + */ + + @BeforeAll + public static void init() throws IOException { + App.main(new String [] {}); + } + + /** + * Setup tests. + */ + + @BeforeEach + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + /** + * Run the Start state test first + * Checks that the session expiration task is waiting when the app is first started + */ + + @Test + @Order(1) + public void expirationTaskStartStateTest() { + //assert + LOGGER.info("Expiration Task Status: "+String.valueOf(App.getExpirationTaskState())); + assertEquals(App.getExpirationTaskState(),WAITING); + } + + + /** + * Run the wake state test second + * Test whether expiration Task is currently sleeping or not (should sleep when woken) + */ + + @Test + @Order(2) + public void expirationTaskWakeStateTest() throws InterruptedException { + App.expirationTaskWake(); + Thread.sleep(200); // Wait until sessionExpirationTask is sleeping + LOGGER.info("Expiration Task Status: "+String.valueOf(App.getExpirationTaskState())); + assertEquals(App.getExpirationTaskState(),TIMED_WAITING); + } + +} \ No newline at end of file diff --git a/twin/src/main/java/com/iluwatar/twin/BallThread.java b/twin/src/main/java/com/iluwatar/twin/BallThread.java index a7c7e1dbc53c..92d522db0022 100644 --- a/twin/src/main/java/com/iluwatar/twin/BallThread.java +++ b/twin/src/main/java/com/iluwatar/twin/BallThread.java @@ -24,96 +24,76 @@ */ package com.iluwatar.twin; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import lombok.Setter; import lombok.extern.slf4j.Slf4j; /** - * This class represents a UI thread for drawing the {@link BallItem} and provides methods - * for suspension and resumption. It holds a reference to {@link BallItem} to delegate the draw task. + * This class is a UI thread for drawing the {@link BallItem}, and provide the method for suspend + * and resume. It holds the reference of {@link BallItem} to delegate the draw task. */ + @Slf4j public class BallThread extends Thread { @Setter private BallItem twin; - private final Lock lock = new ReentrantLock(); - private final Condition notSuspended = lock.newCondition(); + private volatile boolean isSuspended; - private volatile boolean isSuspended = false; private volatile boolean isRunning = true; + private final Object lock = new Object(); + /** - * Run the thread to continuously draw and move the ball unless suspended. + * Run the thread. */ - @Override public void run() { - while (isRunning) { - lock.lock(); - try { - while (isSuspended) { - LOGGER.info("BallThread suspended."); - notSuspended.await(); // Wait until resumed + try { + while (isRunning) { + if (isSuspended) { + synchronized (lock) { + lock.wait(); + } + } else { + twin.draw(); + twin.move(); + Thread.sleep(250); } - } catch (InterruptedException e) { - LOGGER.info("BallThread interrupted.", e); - Thread.currentThread().interrupt(); // Re-interrupt the thread for proper handling - break; - } finally { - lock.unlock(); - } - - // Perform the twin's tasks if not suspended - try { - twin.draw(); - twin.move(); - Thread.sleep(250); // Introduce a controlled interval between actions - } catch (InterruptedException e) { - LOGGER.info("BallThread interrupted during sleep.", e); - Thread.currentThread().interrupt(); // Handle interrupt properly - break; } + } catch (InterruptedException e) { + throw new RuntimeException(e); } - - LOGGER.info("BallThread has stopped."); } - /** - * Suspend the thread's operations. + * suspend the thread. */ public void suspendMe() { - lock.lock(); - try { - isSuspended = true; - LOGGER.info("Suspending BallThread."); - } finally { - lock.unlock(); - } + isSuspended = true; + LOGGER.info("Begin to suspend BallThread"); } /** - * Resume the thread's operations. + * notify run to resume. */ + public void resumeMe() { - lock.lock(); - try { - isSuspended = false; - notSuspended.signal(); // Notify the thread to resume - LOGGER.info("Resuming BallThread."); - } finally { - lock.unlock(); + isSuspended = false; + LOGGER.info("Begin to resume BallThread"); + + synchronized (lock) { + lock.notifyAll(); } } /** - * Stop the thread's operations. + * Stop running thread. */ public void stopMe() { - isRunning = false; - this.interrupt(); // Interrupt the thread to terminate - LOGGER.info("Stopping BallThread."); + this.isRunning = false; + this.isSuspended = true; + synchronized (lock) { + lock.notifyAll(); + } } } + diff --git a/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java b/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java index 26cf78509dcf..244d11a6dfaa 100644 --- a/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java +++ b/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java @@ -27,6 +27,7 @@ import static java.lang.Thread.UncaughtExceptionHandler; import static java.lang.Thread.sleep; import static java.time.Duration.ofMillis; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTimeout; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -35,12 +36,14 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; /** * BallThreadTest * */ +@Slf4j class BallThreadTest { /** @@ -59,12 +62,12 @@ void testSuspend() { verify(ballItem, atLeastOnce()).draw(); verify(ballItem, atLeastOnce()).move(); ballThread.suspendMe(); - sleep(1000); + LOGGER.info("Current ballThread State: "+ballThread.getState()); + assertEquals(ballThread.getState(), Thread.State.WAITING); ballThread.stopMe(); ballThread.join(); - verifyNoMoreInteractions(ballItem); }); } @@ -86,8 +89,9 @@ void testResume() { sleep(1000); verifyNoMoreInteractions(ballItem); - ballThread.resumeMe(); + LOGGER.info("Current ballThread State: "+ballThread.getState()); + assertEquals(ballThread.getState(), Thread.State.RUNNABLE); sleep(300); verify(ballItem, atLeastOnce()).draw(); verify(ballItem, atLeastOnce()).move();