Skip to content

Commit b186ff5

Browse files
docs: #2255 Add explanation into README.md for Queue-load-leveling pattern (#2707)
* fix: #2255 modify README.md file under queue-load-leveling. - add explanation section - add plai-word and wiki say - add programmatic example with code of taskGenerator and ServiceExecutor classes - add console output by running the code at local * ref: add '## explanation' line above real-world example
1 parent 5d92ca9 commit b186ff5

File tree

1 file changed

+276
-3
lines changed

1 file changed

+276
-3
lines changed

queue-load-leveling/README.md

Lines changed: 276 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,281 @@ intermittent heavy loads that may otherwise cause the service to fail or the tas
1414
This pattern can help to minimize the impact of peaks in demand on availability and responsiveness
1515
for both the task and the service.
1616

17+
## Explanation
18+
19+
Real world example
20+
> A Microsoft Azure web role stores data by using a separate storage service. If a large number of instances of the web
21+
> role run concurrently, it is possible that the storage service could be overwhelmed and be unable to respond to requests
22+
> quickly enough to prevent these requests from timing out or failing.
23+
24+
In plain words
25+
> Makes resource-load balanced by ensuring an intermediate data structure like queue that makes bridge
26+
> between service-takers and service-givers. Where both takers and givers are running asynchronously and
27+
> service-takers can tolerate some amount of delay to get feedback.
28+
>
29+
30+
Wikipedia says
31+
32+
> In computing, load balancing is the process of distributing a set of tasks over a set of resources
33+
> (computing units), with the aim of making their overall processing more efficient. Load balancing can
34+
> optimize the response time and avoid unevenly overloading some compute nodes while other compute nodes
35+
> are left idle.
36+
37+
**Programmatic Example**
38+
39+
TaskGenerator implements Task, runnable interfaces. Hence, It runs asynchronously.
40+
41+
```java
42+
/**
43+
* Task Interface.
44+
*/
45+
public interface Task {
46+
void submit(Message msg);
47+
}
48+
```
49+
It submits tasks to ServiceExecutor to serve tasks.
50+
```java
51+
/**
52+
* TaskGenerator class. Each TaskGenerator thread will be a Worker which submit's messages to the
53+
* queue. We need to mention the message count for each of the TaskGenerator threads.
54+
*/
55+
@Slf4j
56+
public class TaskGenerator implements Task, Runnable {
57+
58+
// MessageQueue reference using which we will submit our messages.
59+
private final MessageQueue msgQueue;
60+
61+
// Total message count that a TaskGenerator will submit.
62+
private final int msgCount;
63+
64+
// Parameterized constructor.
65+
public TaskGenerator(MessageQueue msgQueue, int msgCount) {
66+
this.msgQueue = msgQueue;
67+
this.msgCount = msgCount;
68+
}
69+
70+
/**
71+
* Submit messages to the Blocking Queue.
72+
*/
73+
public void submit(Message msg) {
74+
try {
75+
this.msgQueue.submitMsg(msg);
76+
} catch (Exception e) {
77+
LOGGER.error(e.getMessage());
78+
}
79+
}
80+
81+
/**
82+
* Each TaskGenerator thread will submit all the messages to the Queue. After every message
83+
* submission TaskGenerator thread will sleep for 1 second.
84+
*/
85+
public void run() {
86+
var count = this.msgCount;
87+
88+
try {
89+
while (count > 0) {
90+
var statusMsg = "Message-" + count + " submitted by " + Thread.currentThread().getName();
91+
this.submit(new Message(statusMsg));
92+
93+
LOGGER.info(statusMsg);
94+
95+
// reduce the message count.
96+
count--;
97+
98+
// Make the current thread to sleep after every Message submission.
99+
Thread.sleep(1000);
100+
}
101+
} catch (Exception e) {
102+
LOGGER.error(e.getMessage());
103+
}
104+
}
105+
}
106+
```
107+
It also implements runnable interface and run asynchronously. It retrieves tasks one by one
108+
from blockingQueue to serve.
109+
```java
110+
/**
111+
* ServiceExecuotr class. This class will pick up Messages one by one from the Blocking Queue and
112+
* process them.
113+
*/
114+
@Slf4j
115+
public class ServiceExecutor implements Runnable {
116+
117+
private final MessageQueue msgQueue;
118+
119+
public ServiceExecutor(MessageQueue msgQueue) {
120+
this.msgQueue = msgQueue;
121+
}
122+
123+
/**
124+
* The ServiceExecutor thread will retrieve each message and process it.
125+
*/
126+
public void run() {
127+
try {
128+
while (!Thread.currentThread().isInterrupted()) {
129+
var msg = msgQueue.retrieveMsg();
130+
131+
if (null != msg) {
132+
LOGGER.info(msg.toString() + " is served.");
133+
} else {
134+
LOGGER.info("Service Executor: Waiting for Messages to serve .. ");
135+
}
136+
137+
Thread.sleep(1000);
138+
}
139+
} catch (Exception e) {
140+
LOGGER.error(e.getMessage());
141+
}
142+
}
143+
}
144+
```
145+
146+
BlockingQueue data-structure is used in MessageQueue class for acting buffer
147+
between TaskGenerator to ServiceExecutor.
148+
149+
```java
150+
public class MessageQueue {
151+
152+
private final BlockingQueue<Message> blkQueue;
153+
154+
// Default constructor when called creates Blocking Queue object.
155+
public MessageQueue() {
156+
this.blkQueue = new ArrayBlockingQueue<>(1024);
157+
}
158+
159+
/**
160+
* All the TaskGenerator threads will call this method to insert the Messages in to the Blocking
161+
* Queue.
162+
*/
163+
public void submitMsg(Message msg) {
164+
try {
165+
if (null != msg) {
166+
blkQueue.add(msg);
167+
}
168+
} catch (Exception e) {
169+
LOGGER.error(e.getMessage());
170+
}
171+
}
172+
173+
/**
174+
* All the messages will be retrieved by the ServiceExecutor by calling this method and process
175+
* them. Retrieves and removes the head of this queue, or returns null if this queue is empty.
176+
*/
177+
public Message retrieveMsg() {
178+
try {
179+
return blkQueue.poll();
180+
} catch (Exception e) {
181+
LOGGER.error(e.getMessage());
182+
}
183+
return null;
184+
}
185+
}
186+
```
187+
TaskGenerator submit message object to ServiceExecutor for serving.
188+
```java
189+
/**
190+
* Message class with only one parameter.
191+
*/
192+
@Getter
193+
@RequiredArgsConstructor
194+
public class Message {
195+
private final String msg;
196+
197+
@Override
198+
public String toString() {
199+
return msg;
200+
}
201+
}
202+
```
203+
To simulate the situation ExecutorService is used here. ExecutorService automatically provides a pool of threads and
204+
an API for assigning tasks to it.
205+
```java
206+
public class App {
207+
208+
//Executor shut down time limit.
209+
private static final int SHUTDOWN_TIME = 15;
210+
211+
/**
212+
* Program entry point.
213+
*
214+
* @param args command line args
215+
*/
216+
public static void main(String[] args) {
217+
218+
// An Executor that provides methods to manage termination and methods that can
219+
// produce a Future for tracking progress of one or more asynchronous tasks.
220+
ExecutorService executor = null;
221+
222+
try {
223+
// Create a MessageQueue object.
224+
var msgQueue = new MessageQueue();
225+
226+
LOGGER.info("Submitting TaskGenerators and ServiceExecutor threads.");
227+
228+
// Create three TaskGenerator threads. Each of them will submit different number of jobs.
229+
final var taskRunnable1 = new TaskGenerator(msgQueue, 5);
230+
final var taskRunnable2 = new TaskGenerator(msgQueue, 1);
231+
final var taskRunnable3 = new TaskGenerator(msgQueue, 2);
232+
233+
// Create e service which should process the submitted jobs.
234+
final var srvRunnable = new ServiceExecutor(msgQueue);
235+
236+
// Create a ThreadPool of 2 threads and
237+
// submit all Runnable task for execution to executor..
238+
executor = Executors.newFixedThreadPool(2);
239+
executor.submit(taskRunnable1);
240+
executor.submit(taskRunnable2);
241+
executor.submit(taskRunnable3);
242+
243+
// submitting serviceExecutor thread to the Executor service.
244+
executor.submit(srvRunnable);
245+
246+
// Initiates an orderly shutdown.
247+
LOGGER.info("Initiating shutdown."
248+
+ " Executor will shutdown only after all the Threads are completed.");
249+
executor.shutdown();
250+
251+
// Wait for SHUTDOWN_TIME seconds for all the threads to complete
252+
// their tasks and then shut down the executor and then exit.
253+
if (!executor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) {
254+
LOGGER.info("Executor was shut down and Exiting.");
255+
executor.shutdownNow();
256+
}
257+
} catch (Exception e) {
258+
LOGGER.error(e.getMessage());
259+
}
260+
}
261+
}
262+
```
263+
264+
The console output
265+
```
266+
[main] INFO App - Submitting TaskGenerators and ServiceExecutor threads.
267+
[main] INFO App - Initiating shutdown. Executor will shutdown only after all the Threads are completed.
268+
[pool-1-thread-2] INFO TaskGenerator - Message-1 submitted by pool-1-thread-2
269+
[pool-1-thread-1] INFO TaskGenerator - Message-5 submitted by pool-1-thread-1
270+
[pool-1-thread-1] INFO TaskGenerator - Message-4 submitted by pool-1-thread-1
271+
[pool-1-thread-2] INFO TaskGenerator - Message-2 submitted by pool-1-thread-2
272+
[pool-1-thread-1] INFO TaskGenerator - Message-3 submitted by pool-1-thread-1
273+
[pool-1-thread-2] INFO TaskGenerator - Message-1 submitted by pool-1-thread-2
274+
[pool-1-thread-1] INFO TaskGenerator - Message-2 submitted by pool-1-thread-1
275+
[pool-1-thread-2] INFO ServiceExecutor - Message-1 submitted by pool-1-thread-2 is served.
276+
[pool-1-thread-1] INFO TaskGenerator - Message-1 submitted by pool-1-thread-1
277+
[pool-1-thread-2] INFO ServiceExecutor - Message-5 submitted by pool-1-thread-1 is served.
278+
[pool-1-thread-2] INFO ServiceExecutor - Message-4 submitted by pool-1-thread-1 is served.
279+
[pool-1-thread-2] INFO ServiceExecutor - Message-2 submitted by pool-1-thread-2 is served.
280+
[pool-1-thread-2] INFO ServiceExecutor - Message-3 submitted by pool-1-thread-1 is served.
281+
[pool-1-thread-2] INFO ServiceExecutor - Message-1 submitted by pool-1-thread-2 is served.
282+
[pool-1-thread-2] INFO ServiceExecutor - Message-2 submitted by pool-1-thread-1 is served.
283+
[pool-1-thread-2] INFO ServiceExecutor - Message-1 submitted by pool-1-thread-1 is served.
284+
[pool-1-thread-2] INFO ServiceExecutor - Service Executor: Waiting for Messages to serve ..
285+
[pool-1-thread-2] INFO ServiceExecutor - Service Executor: Waiting for Messages to serve ..
286+
[pool-1-thread-2] INFO ServiceExecutor - Service Executor: Waiting for Messages to serve ..
287+
[pool-1-thread-2] INFO ServiceExecutor - Service Executor: Waiting for Messages to serve ..
288+
[main] INFO App - Executor was shut down and Exiting.
289+
[pool-1-thread-2] ERROR ServiceExecutor - sleep interrupted
290+
```
291+
17292
## Class diagram
18293
![alt text](./etc/queue-load-leveling.gif "queue-load-leveling")
19294

@@ -25,10 +300,8 @@ for both the task and the service.
25300
## Tutorials
26301
* [Queue-Based Load Leveling Pattern](http://java-design-patterns.com/blog/queue-load-leveling/)
27302

28-
## Real world example
29-
30-
* A Microsoft Azure web role stores data by using a separate storage service. If a large number of instances of the web role run concurrently, it is possible that the storage service could be overwhelmed and be unable to respond to requests quickly enough to prevent these requests from timing out or failing.
31303

32304
## Credits
33305

34306
* [Queue-Based Load Leveling pattern](https://docs.microsoft.com/en-us/azure/architecture/patterns/queue-based-load-leveling)
307+
* [Load-Balancing](https://www.wikiwand.com/en/Load_balancing_(computing))

0 commit comments

Comments
 (0)