Skip to content

Commit d5864d9

Browse files
updated
1 parent 8aa0647 commit d5864d9

File tree

11 files changed

+378
-151
lines changed

11 files changed

+378
-151
lines changed
Lines changed: 64 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,59 @@
1+
/*
2+
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
3+
*
4+
* The MIT License
5+
* Copyright © 2014-2022 Ilkka Seppälä
6+
*
7+
* Permission is hereby granted, free of charge, to any person obtaining a copy
8+
* of this software and associated documentation files (the "Software"), to deal
9+
* in the Software without restriction, including without limitation the rights
10+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
* copies of the Software, and to permit persons to whom the Software is
12+
* furnished to do so, subject to the following conditions:
13+
*
14+
* The above copyright notice and this permission notice shall be included in
15+
* all copies or substantial portions of the Software.
16+
*
17+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
* THE SOFTWARE.
24+
*/
125
package com.iluwatar.logaggregation;
226

327
import java.util.concurrent.ConcurrentLinkedQueue;
28+
import java.util.concurrent.ExecutorService;
429
import java.util.concurrent.Executors;
5-
import java.util.concurrent.ScheduledExecutorService;
630
import java.util.concurrent.TimeUnit;
731
import java.util.concurrent.atomic.AtomicInteger;
832
import lombok.extern.slf4j.Slf4j;
933

1034
/**
11-
* Collects and buffers logs from different services, periodically flushing them
12-
* to a central log store based on a time interval or buffer threshold.
35+
* Responsible for collecting and buffering logs from different services.
36+
* Once the logs reach a certain threshold or after a certain time interval,
37+
* they are flushed to the central log store. This class ensures logs are collected
38+
* and processed asynchronously and efficiently, providing both an immediate collection
39+
* and periodic flushing.
1340
*/
1441
@Slf4j
1542
public class LogAggregator {
1643

1744
private static final int BUFFER_THRESHOLD = 3;
18-
private static final int FLUSH_INTERVAL = 5; // Interval in seconds for periodic flushing
19-
2045
private final CentralLogStore centralLogStore;
2146
private final ConcurrentLinkedQueue<LogEntry> buffer = new ConcurrentLinkedQueue<>();
2247
private final LogLevel minLogLevel;
48+
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
49+
private final Object bufferWait = new Object();
2350
private final AtomicInteger logCount = new AtomicInteger(0);
2451

25-
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
26-
2752
/**
28-
* Constructor of LogAggregator.
53+
* constructor of LogAggregator.
2954
*
30-
* @param centralLogStore central log store implementation
31-
* @param minLogLevel minimum log level to store log
55+
* @param centralLogStore central log store implement
56+
* @param minLogLevel min log level to store log
3257
*/
3358
public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
3459
this.centralLogStore = centralLogStore;
@@ -37,8 +62,7 @@ public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
3762
}
3863

3964
/**
40-
* Collects a log entry and buffers it for eventual flushing to the central log store.
41-
* Filters logs based on the configured minimum log level.
65+
* Collects a given log entry, and filters it by the defined log level.
4266
*
4367
* @param logEntry The log entry to collect.
4468
*/
@@ -54,6 +78,7 @@ public void collectLog(LogEntry logEntry) {
5478
}
5579

5680
buffer.offer(logEntry);
81+
bufferWake();
5782

5883
if (logCount.incrementAndGet() >= BUFFER_THRESHOLD) {
5984
flushBuffer();
@@ -64,41 +89,48 @@ public void collectLog(LogEntry logEntry) {
6489
* Stops the log aggregator service and flushes any remaining logs to
6590
* the central log store.
6691
*
67-
* @throws InterruptedException If interrupted while shutting down.
92+
* @throws InterruptedException If any thread has interrupted the current thread.
6893
*/
6994
public void stop() throws InterruptedException {
70-
LOGGER.info("Stopping log aggregator...");
71-
scheduler.shutdown();
72-
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
73-
LOGGER.error("Log aggregator did not terminate cleanly.");
95+
executorService.shutdownNow();
96+
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
97+
LOGGER.error("Log aggregator did not terminate.");
7498
}
7599
flushBuffer();
76100
}
77101

78-
/**
79-
* Flushes the buffered logs to the central log store.
80-
*/
81102
private void flushBuffer() {
82-
LOGGER.info("Flushing buffer...");
83103
LogEntry logEntry;
84104
while ((logEntry = buffer.poll()) != null) {
85105
centralLogStore.storeLog(logEntry);
86106
logCount.decrementAndGet();
87107
}
88-
LOGGER.info("Buffer flushed.");
89108
}
90109

91-
/**
92-
* Starts the periodic buffer flusher task using a scheduled executor service.
93-
*/
94110
private void startBufferFlusher() {
95-
scheduler.scheduleAtFixedRate(() -> {
96-
try {
97-
LOGGER.info("Periodic buffer flush initiated...");
98-
flushBuffer();
99-
} catch (Exception e) {
100-
LOGGER.error("Error during buffer flush", e);
111+
executorService.execute(() -> {
112+
while (!Thread.currentThread().isInterrupted()) {
113+
try {
114+
synchronized (bufferWait) {
115+
if (buffer.isEmpty()) {
116+
bufferWait.wait();
117+
}
118+
}
119+
Thread.sleep(5000); // Flush every 5 seconds.
120+
flushBuffer();
121+
} catch (InterruptedException e) {
122+
Thread.currentThread().interrupt();
123+
}
101124
}
102-
}, FLUSH_INTERVAL, FLUSH_INTERVAL, TimeUnit.SECONDS);
125+
});
126+
}
127+
128+
/**
129+
* Wakes up buffer.
130+
*/
131+
public void bufferWake() {
132+
synchronized (bufferWait) {
133+
bufferWait.notifyAll();
134+
}
103135
}
104136
}

queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/MessageQueue.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
public class MessageQueue {
3737

3838
private final BlockingQueue<Message> blkQueue;
39+
public final Object serviceExecutorWait = new Object();
3940

4041
// Default constructor when called creates Blocking Queue object.
4142
public MessageQueue() {
@@ -50,6 +51,9 @@ public void submitMsg(Message msg) {
5051
try {
5152
if (null != msg) {
5253
blkQueue.add(msg);
54+
synchronized (serviceExecutorWait) {
55+
serviceExecutorWait.notifyAll();
56+
}
5357
}
5458
} catch (Exception e) {
5559
LOGGER.error(e.getMessage());

queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,32 +24,38 @@
2424
*/
2525
package com.iluwatar.queue.load.leveling;
2626

27-
import java.util.concurrent.BlockingQueue;
28-
import java.util.concurrent.TimeUnit;
2927
import lombok.extern.slf4j.Slf4j;
3028

29+
/**
30+
* ServiceExecuotr class. This class will pick up Messages one by one from the Blocking Queue and
31+
* process them.
32+
*/
3133
@Slf4j
3234
public class ServiceExecutor implements Runnable {
33-
private final BlockingQueue<String> msgQueue;
34-
35-
public ServiceExecutor(BlockingQueue<String> msgQueue) {
36-
this.msgQueue = msgQueue;
37-
}
35+
private final MessageQueue msgQueue;
36+
public ServiceExecutor(MessageQueue msgQueue) {
37+
this.msgQueue = msgQueue;
38+
}
3839

39-
@Override
40-
public void run() {
41-
try {
42-
while (!Thread.currentThread().isInterrupted()) {
43-
String msg = msgQueue.poll(1, TimeUnit.SECONDS); // Wait for message with timeout
40+
/**
41+
* The ServiceExecutor thread will retrieve each message and process it.
42+
*/
43+
public void run() {
44+
try {
45+
while (!Thread.currentThread().isInterrupted()) {
46+
var msg = msgQueue.retrieveMsg();
4447

45-
if (msg != null) {
46-
LOGGER.info(msg + " is served.");
47-
} else {
48-
LOGGER.info("Service Executor: Waiting for Messages to serve...");
49-
}
50-
}
51-
} catch (InterruptedException e) {
52-
LOGGER.error("Service Executor interrupted: " + e.getMessage());
48+
if (null != msg) {
49+
LOGGER.info(msg + " is served.");
50+
} else {
51+
LOGGER.info("Service Executor: Waiting for Messages to serve .. ");
52+
synchronized (msgQueue.serviceExecutorWait) {
53+
msgQueue.serviceExecutorWait.wait();
54+
}
5355
}
56+
}
57+
} catch (Exception e) {
58+
LOGGER.error(e.getMessage());
5459
}
60+
}
5561
}

queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/TaskGenerator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,8 @@ public void run() {
6666
try {
6767
while (count > 0) {
6868
var statusMsg = "Message-" + count + " submitted by " + Thread.currentThread().getName();
69-
this.submit(new Message(statusMsg));
70-
7169
LOGGER.info(statusMsg);
70+
this.submit(new Message(statusMsg));
7271

7372
// reduce the message count.
7473
count--;

queue-based-load-leveling/src/test/java/com/iluwatar/queue/load/leveling/TaskGenSrvExeTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,19 @@
2424
*/
2525
package com.iluwatar.queue.load.leveling;
2626

27+
import static java.util.concurrent.CompletableFuture.anyOf;
28+
import static org.junit.jupiter.api.Assertions.assertEquals;
2729
import static org.junit.jupiter.api.Assertions.assertNotNull;
30+
import static org.junit.jupiter.api.Assertions.assertTrue;
2831

32+
import lombok.extern.slf4j.Slf4j;
2933
import org.junit.jupiter.api.Test;
3034

3135
/**
3236
* Test case for submitting Message to Blocking Queue by TaskGenerator and retrieve the message by
3337
* ServiceExecutor.
3438
*/
39+
@Slf4j
3540
class TaskGenSrvExeTest {
3641

3742
@Test
@@ -53,4 +58,37 @@ void taskGeneratorTest() {
5358
assertNotNull(srvExeThr);
5459
}
5560

61+
/**
62+
* Tests that service executor waits at start since no message is sent to execute upon.
63+
* @throws InterruptedException
64+
*/
65+
@Test
66+
void serviceExecutorStartStateTest() throws InterruptedException {
67+
var msgQueue = new MessageQueue();
68+
var srvRunnable = new ServiceExecutor(msgQueue);
69+
var srvExeThr = new Thread(srvRunnable);
70+
srvExeThr.start();
71+
Thread.sleep(200); // sleep a little until service executor thread waits
72+
LOGGER.info("Current Service Executor State: " + srvExeThr.getState());
73+
assertEquals(srvExeThr.getState(), Thread.State.WAITING);
74+
75+
}
76+
77+
@Test
78+
void serviceExecutorWakeStateTest() throws InterruptedException {
79+
var msgQueue = new MessageQueue();
80+
var srvRunnable = new ServiceExecutor(msgQueue);
81+
var srvExeThr = new Thread(srvRunnable);
82+
srvExeThr.start();
83+
Thread.sleep(200); // sleep a little until service executor thread waits
84+
synchronized (msgQueue.serviceExecutorWait){
85+
msgQueue.serviceExecutorWait.notifyAll();
86+
}
87+
var srvExeState = srvExeThr.getState();
88+
LOGGER.info("Current Service Executor State: " + srvExeState);
89+
// assert that state changes from waiting
90+
assertTrue(srvExeState != Thread.State.WAITING);
91+
92+
}
93+
5694
}

0 commit comments

Comments
 (0)