Skip to content

Commit ed41fae

Browse files
fixed code
This project contributed to the Java Design Patterns repository by addressing an issue related to eliminating busy-waiting loops. The code was refactored to improve performance, adhere to Object-Oriented (OO) principles, and enhance maintainability. Key fixes included replacing busy-waiting with efficient mechanisms like wait/notify and ScheduledExecutorService, improving thread safety and code clarity. Comprehensive unit tests were added to validate functionality under various scenarios, including concurrency and interruption handling. These changes ensure robust, modern implementations aligned with OOAD principles.
1 parent adbddcb commit ed41fae

File tree

9 files changed

+586
-315
lines changed

9 files changed

+586
-315
lines changed

commander/src/main/java/com/iluwatar/commander/Retry.java renamed to commander/src/main/java/com/iluwatar/commander/HandleErrorIssue.java

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,50 +23,81 @@
2323
* THE SOFTWARE.
2424
*/
2525
package com.iluwatar.commander;
26-
2726
import java.security.SecureRandom;
2827
import java.util.ArrayList;
2928
import java.util.Arrays;
3029
import java.util.List;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.ScheduledExecutorService;
32+
import java.util.concurrent.TimeUnit;
3133
import java.util.concurrent.atomic.AtomicInteger;
3234
import java.util.function.Predicate;
3335

36+
37+
38+
39+
40+
41+
42+
43+
Expand Down
44+
45+
46+
47+
48+
49+
Expand Up
50+
51+
@@ -59,6 +62,7 @@ public interface HandleErrorIssue<T> {
52+
3453
/**
3554
* Retry pattern.
3655
*
3756
* @param <T> is the type of object passed into HandleErrorIssue as a parameter.
3857
*/
39-
4058
public class Retry<T> {
41-
4259
/**
4360
* Operation Interface will define method to be implemented.
4461
*/
45-
4662
public interface Operation {
4763
void operation(List<Exception> list) throws Exception;
4864
}
49-
5065
/**
5166
* HandleErrorIssue defines how to handle errors.
5267
*
5368
* @param <T> is the type of object to be passed into the method as parameter.
5469
*/
55-
5670
public interface HandleErrorIssue<T> {
5771
void handleIssue(T obj, Exception e);
5872
}
5973

6074
private static final SecureRandom RANDOM = new SecureRandom();
6175

76+
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
6277
private final Operation op;
6378
private final HandleErrorIssue<T> handleError;
6479
private final int maxAttempts;
80+
81+
82+
83+
84+
85+
86+
87+
Expand Down
88+
89+
90+
91+
92+
93+
Expand Up
94+
95+
@@ -86,26 +90,25 @@ public interface HandleErrorIssue<T> {
96+
6597
private final long maxDelay;
6698
private final AtomicInteger attempts;
6799
private final Predicate<Exception> test;
68100
private final List<Exception> errors;
69-
70101
Retry(Operation op, HandleErrorIssue<T> handleError, int maxAttempts,
71102
long maxDelay, Predicate<Exception>... ignoreTests) {
72103
this.op = op;
@@ -77,7 +108,6 @@ public interface HandleErrorIssue<T> {
77108
this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false);
78109
this.errors = new ArrayList<>();
79110
}
80-
81111
/**
82112
* Performing the operation with retries.
83113
*
@@ -86,26 +116,25 @@ public interface HandleErrorIssue<T> {
86116
*/
87117

88118
public void perform(List<Exception> list, T obj) {
89-
do {
119+
scheduler.schedule(() -> {
90120
try {
91121
op.operation(list);
92-
return;
93-
} catch (Exception e) {
122+
}catch (Exception e){
94123
this.errors.add(e);
95124
if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) {
96125
this.handleError.handleIssue(obj, e);
126+
scheduler.shutdown();
97127
return; //return here... don't go further
98128
}
99-
try {
100-
long testDelay =
101-
(long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000);
102-
long delay = Math.min(testDelay, this.maxDelay);
103-
Thread.sleep(delay);
104-
} catch (InterruptedException f) {
105-
//ignore
106-
}
129+
perform(list, obj);
107130
}
108-
} while (true);
131+
}, calculateDelay(), TimeUnit.MILLISECONDS);
132+
}
133+
134+
private long calculateDelay(){
135+
long testDelay =
136+
(long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000);
137+
return Math.min(testDelay, this.maxDelay);
109138
}
110139

111-
}
140+
}

microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,22 @@
2525
package com.iluwatar.logaggregation;
2626

2727
import java.util.concurrent.ConcurrentLinkedQueue;
28-
import java.util.concurrent.ExecutorService;
2928
import java.util.concurrent.Executors;
29+
import java.util.concurrent.ScheduledExecutorService;
3030
import java.util.concurrent.TimeUnit;
3131
import java.util.concurrent.atomic.AtomicInteger;
3232
import lombok.extern.slf4j.Slf4j;
3333

34+
35+
36+
37+
38+
39+
40+
Expand All
41+
42+
@@ -45,7 +45,7 @@ public class LogAggregator {
43+
3444
/**
3545
* Responsible for collecting and buffering logs from different services.
3646
* Once the logs reach a certain threshold or after a certain time interval,
@@ -40,15 +50,31 @@
4050
*/
4151
@Slf4j
4252
public class LogAggregator {
43-
4453
private static final int BUFFER_THRESHOLD = 3;
4554
private final CentralLogStore centralLogStore;
4655
private final ConcurrentLinkedQueue<LogEntry> buffer = new ConcurrentLinkedQueue<>();
4756
private final LogLevel minLogLevel;
48-
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
57+
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
4958
private final AtomicInteger logCount = new AtomicInteger(0);
5059

5160
/**
61+
62+
63+
64+
65+
66+
67+
68+
Expand Down
69+
70+
71+
72+
73+
74+
Expand Up
75+
76+
@@ -90,8 +90,8 @@ public void collectLog(LogEntry logEntry) {
77+
5278
* constructor of LogAggregator.
5379
*
5480
* @param centralLogStore central log store implement
@@ -59,7 +85,6 @@ public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
5985
this.minLogLevel = minLogLevel;
6086
startBufferFlusher();
6187
}
62-
6388
/**
6489
* Collects a given log entry, and filters it by the defined log level.
6590
*
@@ -70,33 +95,39 @@ public void collectLog(LogEntry logEntry) {
7095
LOGGER.warn("Log level or threshold level is null. Skipping.");
7196
return;
7297
}
73-
7498
if (logEntry.getLevel().compareTo(minLogLevel) < 0) {
7599
LOGGER.debug("Log level below threshold. Skipping.");
76100
return;
77101
}
78-
79102
buffer.offer(logEntry);
80-
81103
if (logCount.incrementAndGet() >= BUFFER_THRESHOLD) {
82104
flushBuffer();
83105
}
84106
}
85-
86107
/**
87108
* Stops the log aggregator service and flushes any remaining logs to
88109
* the central log store.
89110
*
90111
* @throws InterruptedException If any thread has interrupted the current thread.
91112
*/
92113
public void stop() throws InterruptedException {
93-
executorService.shutdownNow();
94-
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
114+
scheduler.shutdownNow();
115+
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
95116
LOGGER.error("Log aggregator did not terminate.");
96117
}
97118
flushBuffer();
98-
}
99119

120+
121+
122+
123+
124+
125+
126+
Expand All
127+
128+
@@ -106,15 +106,7 @@ private void flushBuffer() {
129+
130+
}
100131
private void flushBuffer() {
101132
LogEntry logEntry;
102133
while ((logEntry = buffer.poll()) != null) {
@@ -106,15 +137,7 @@ private void flushBuffer() {
106137
}
107138

108139
private void startBufferFlusher() {
109-
executorService.execute(() -> {
110-
while (!Thread.currentThread().isInterrupted()) {
111-
try {
112-
Thread.sleep(5000); // Flush every 5 seconds.
113-
flushBuffer();
114-
} catch (InterruptedException e) {
115-
Thread.currentThread().interrupt();
116-
}
117-
}
118-
});
140+
//flush every 5 seconds
141+
scheduler.scheduleWithFixedDelay(this::flushBuffer, 0, 5000, TimeUnit.MILLISECONDS);
119142
}
120-
}
143+
}

queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,26 @@
2626

2727
import java.util.concurrent.ExecutorService;
2828
import java.util.concurrent.Executors;
29-
import java.util.concurrent.TimeUnit;
3029
import lombok.extern.slf4j.Slf4j;
3130

3231
/**
32+
33+
34+
35+
36+
37+
38+
39+
Expand Down
40+
41+
42+
43+
44+
45+
Expand Up
46+
47+
@@ -104,12 +103,7 @@ public static void main(String[] args) {
48+
3349
* Many solutions in the cloud involve running tasks that invoke services. In this environment, if a
3450
* service is subjected to intermittent heavy loads, it can cause performance or reliability
3551
* issues.
@@ -60,58 +76,54 @@
6076
*/
6177
@Slf4j
6278
public class App {
63-
6479
//Executor shut down time limit.
6580
private static final int SHUTDOWN_TIME = 15;
66-
6781
/**
6882
* Program entry point.
6983
*
7084
* @param args command line args
7185
*/
7286
public static void main(String[] args) {
73-
7487
// An Executor that provides methods to manage termination and methods that can
7588
// produce a Future for tracking progress of one or more asynchronous tasks.
7689
ExecutorService executor = null;
77-
7890
try {
7991
// Create a MessageQueue object.
8092
var msgQueue = new MessageQueue();
81-
8293
LOGGER.info("Submitting TaskGenerators and ServiceExecutor threads.");
83-
8494
// Create three TaskGenerator threads. Each of them will submit different number of jobs.
8595
final var taskRunnable1 = new TaskGenerator(msgQueue, 5);
8696
final var taskRunnable2 = new TaskGenerator(msgQueue, 1);
8797
final var taskRunnable3 = new TaskGenerator(msgQueue, 2);
88-
8998
// Create e service which should process the submitted jobs.
9099
final var srvRunnable = new ServiceExecutor(msgQueue);
91-
92100
// Create a ThreadPool of 2 threads and
93101
// submit all Runnable task for execution to executor
94102
executor = Executors.newFixedThreadPool(2);
95103
executor.submit(taskRunnable1);
96104
executor.submit(taskRunnable2);
97105
executor.submit(taskRunnable3);
98-
99106
// submitting serviceExecutor thread to the Executor service.
100107
executor.submit(srvRunnable);
101-
102108
// Initiates an orderly shutdown.
103109
LOGGER.info("Initiating shutdown."
104110
+ " Executor will shutdown only after all the Threads are completed.");
105111
executor.shutdown();
106112

107-
// Wait for SHUTDOWN_TIME seconds for all the threads to complete
108-
// their tasks and then shut down the executor and then exit.
109-
if (!executor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) {
110-
LOGGER.info("Executor was shut down and Exiting.");
111-
executor.shutdownNow();
112-
}
113+
srvRunnable.shutdown(SHUTDOWN_TIME);
113114
} catch (Exception e) {
114115
LOGGER.error(e.getMessage());
115116
}
117+
118+
119+
120+
121+
122+
123+
124+
Expand Down
125+
126+
127+
116128
}
117129
}

0 commit comments

Comments
 (0)