Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 43 additions & 71 deletions commander/src/main/java/com/iluwatar/commander/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,84 +28,56 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.*;
import java.util.function.Predicate;

/**
* Retry pattern.
*
* @param <T> is the type of object passed into HandleErrorIssue as a parameter.
*/

public class Retry<T> {

/**
* Operation Interface will define method to be implemented.
*/

public interface Operation {
void operation(List<Exception> list) throws Exception;
}

/**
* HandleErrorIssue defines how to handle errors.
*
* @param <T> is the type of object to be passed into the method as parameter.
*/

public interface HandleErrorIssue<T> {
void handleIssue(T obj, Exception e);
}
public interface Operation {
void operation(List<Exception> list) throws Exception;
}

private static final SecureRandom RANDOM = new SecureRandom();
public interface HandleErrorIssue<T> {
void handleIssue(T obj, Exception e);
}

private final Operation op;
private final HandleErrorIssue<T> handleError;
private final int maxAttempts;
private final long maxDelay;
private final AtomicInteger attempts;
private final Predicate<Exception> test;
private final List<Exception> errors;
private static final SecureRandom RANDOM = new SecureRandom();

Retry(Operation op, HandleErrorIssue<T> handleError, int maxAttempts,
long maxDelay, Predicate<Exception>... ignoreTests) {
this.op = op;
this.handleError = handleError;
this.maxAttempts = maxAttempts;
this.maxDelay = maxDelay;
this.attempts = new AtomicInteger();
this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false);
this.errors = new ArrayList<>();
}
private final Operation op;
private final HandleErrorIssue<T> handleError;
private final int maxAttempts;
private final long maxDelay;
private final AtomicInteger attempts;
private final Predicate<Exception> test;
private final List<Exception> errors;
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

/**
* Performing the operation with retries.
*
* @param list is the exception list
* @param obj is the parameter to be passed into handleIsuue method
*/
Retry(Operation op, HandleErrorIssue<T> handleError, int maxAttempts, long maxDelay, Predicate<Exception>... ignoreTests) {
this.op = op;
this.handleError = handleError;
this.maxAttempts = maxAttempts;
this.maxDelay = maxDelay;
this.attempts = new AtomicInteger();
this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false);
this.errors = new ArrayList<>();
}

public void perform(List<Exception> list, T obj) {
do {
try {
op.operation(list);
return;
} catch (Exception e) {
this.errors.add(e);
if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) {
this.handleError.handleIssue(obj, e);
return; //return here... don't go further
}
try {
long testDelay =
(long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000);
long delay = Math.min(testDelay, this.maxDelay);
Thread.sleep(delay);
} catch (InterruptedException f) {
//ignore
}
}
} while (true);
}
public void perform(List<Exception> list, T obj) {
do {
try {
op.operation(list);
return;
} catch (Exception e) {
this.errors.add(e);
if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) {
this.handleError.handleIssue(obj, e);
return;
}

}
long testDelay = (long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000);
long delay = Math.min(testDelay, this.maxDelay);
executorService.schedule(() -> {}, delay, TimeUnit.MILLISECONDS); // Schedule retry without blocking
}
} while (true);
}
}
Original file line number Diff line number Diff line change
@@ -1,58 +1,34 @@
/*
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
*
* The MIT License
* Copyright © 2014-2022 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.logaggregation;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;

/**
* Responsible for collecting and buffering logs from different services.
* Once the logs reach a certain threshold or after a certain time interval,
* they are flushed to the central log store. This class ensures logs are collected
* and processed asynchronously and efficiently, providing both an immediate collection
* and periodic flushing.
* Collects and buffers logs from different services, periodically flushing them
* to a central log store based on a time interval or buffer threshold.
*/
@Slf4j
public class LogAggregator {

private static final int BUFFER_THRESHOLD = 3;
private static final int FLUSH_INTERVAL = 5; // Interval in seconds for periodic flushing

private final CentralLogStore centralLogStore;
private final ConcurrentLinkedQueue<LogEntry> buffer = new ConcurrentLinkedQueue<>();
private final LogLevel minLogLevel;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final AtomicInteger logCount = new AtomicInteger(0);

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

/**
* constructor of LogAggregator.
* Constructor of LogAggregator.
*
* @param centralLogStore central log store implement
* @param minLogLevel min log level to store log
* @param centralLogStore central log store implementation
* @param minLogLevel minimum log level to store log
*/
public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
this.centralLogStore = centralLogStore;
Expand All @@ -61,7 +37,8 @@ public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
}

/**
* Collects a given log entry, and filters it by the defined log level.
* Collects a log entry and buffers it for eventual flushing to the central log store.
* Filters logs based on the configured minimum log level.
*
* @param logEntry The log entry to collect.
*/
Expand All @@ -87,34 +64,41 @@ public void collectLog(LogEntry logEntry) {
* Stops the log aggregator service and flushes any remaining logs to
* the central log store.
*
* @throws InterruptedException If any thread has interrupted the current thread.
* @throws InterruptedException If interrupted while shutting down.
*/
public void stop() throws InterruptedException {
executorService.shutdownNow();
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.error("Log aggregator did not terminate.");
LOGGER.info("Stopping log aggregator...");
scheduler.shutdown();
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.error("Log aggregator did not terminate cleanly.");
}
flushBuffer();
}

/**
* Flushes the buffered logs to the central log store.
*/
private void flushBuffer() {
LOGGER.info("Flushing buffer...");
LogEntry logEntry;
while ((logEntry = buffer.poll()) != null) {
centralLogStore.storeLog(logEntry);
logCount.decrementAndGet();
}
LOGGER.info("Buffer flushed.");
}

/**
* Starts the periodic buffer flusher task using a scheduled executor service.
*/
private void startBufferFlusher() {
executorService.execute(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(5000); // Flush every 5 seconds.
flushBuffer();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
scheduler.scheduleAtFixedRate(() -> {
try {
LOGGER.info("Periodic buffer flush initiated...");
flushBuffer();
} catch (Exception e) {
LOGGER.error("Error during buffer flush", e);
}
});
}, FLUSH_INTERVAL, FLUSH_INTERVAL, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,32 @@
*/
package com.iluwatar.queue.load.leveling;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

/**
* ServiceExecuotr class. This class will pick up Messages one by one from the Blocking Queue and
* process them.
*/
@Slf4j
public class ServiceExecutor implements Runnable {
private final BlockingQueue<String> msgQueue;

private final MessageQueue msgQueue;

public ServiceExecutor(MessageQueue msgQueue) {
this.msgQueue = msgQueue;
}
public ServiceExecutor(BlockingQueue<String> msgQueue) {
this.msgQueue = msgQueue;
}

/**
* The ServiceExecutor thread will retrieve each message and process it.
*/
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
var msg = msgQueue.retrieveMsg();
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
String msg = msgQueue.poll(1, TimeUnit.SECONDS); // Wait for message with timeout

if (null != msg) {
LOGGER.info(msg + " is served.");
} else {
LOGGER.info("Service Executor: Waiting for Messages to serve .. ");
if (msg != null) {
LOGGER.info(msg + " is served.");
} else {
LOGGER.info("Service Executor: Waiting for Messages to serve...");
}
}
} catch (InterruptedException e) {
LOGGER.error("Service Executor interrupted: " + e.getMessage());
}

Thread.sleep(1000);
}
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
}
}
Loading
Loading